"""Physics LLM — a Gradio-native demo. Pick a physics scenario; a fine-tuned `LFM2-350M` (GGUF, Q4_K_M, run on CPU via llama-cpp-python) autoregressively predicts the next frames in a structured text format. Each predicted frame is parsed and rendered to a 2D canvas with matplotlib, streamed live as the rollout proceeds. The model and prompt format follow AlexWortega/PhysicsLLMEngine: the prompt is the scene header + a few seed frames + "Predict next frame:", and the model continues with "Frame N: …\n obj_i: pos=(x,y), vel=(vx,vy), a=…, av=…". """ from __future__ import annotations import glob import io import json import os import re import time from pathlib import Path def _preload_cuda() -> None: """ZeroGPU: the CUDA build of llama-cpp-python needs libcudart/libcublas on the loader path at import time, but they aren't there by default. The pip nvidia-* packages ship the .so's; preload them globally so `import llama_cpp` succeeds. No-op off GPU / when the packages are absent.""" import ctypes try: import nvidia # noqa: F401 base = os.path.dirname(nvidia.__file__) except Exception: return for sub in ("cuda_runtime", "cublas"): for so in sorted(glob.glob(os.path.join(base, sub, "lib", "*.so*"))): try: ctypes.CDLL(so, mode=ctypes.RTLD_GLOBAL) except OSError: pass _preload_cuda() # Persist CUDA's PTX-JIT cache across requests (ZeroGPU frees the GPU between # calls but the container/filesystem persist) so only the first cold request # pays the ~40s kernel-compile cost. os.environ.setdefault("CUDA_CACHE_PATH", "/tmp/cuda_jit_cache") os.environ.setdefault("CUDA_CACHE_MAXSIZE", str(2 * 1024 * 1024 * 1024)) try: import spaces gpu = spaces.GPU except Exception: # local / non-Spaces: make @gpu(...) a no-op def gpu(*args, **kwargs): if len(args) == 1 and callable(args[0]) and not kwargs: return args[0] return lambda f: f import gradio as gr import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt from matplotlib.patches import Circle, Rectangle import numpy as np from PIL import Image HERE = Path(__file__).parent EXAMPLES_DIR = HERE / "backend" / "examples" GGUF_REPO = "AlexWortega/lfm2-scenarios-GGUF" GGUF_FILE = "lfm2-scenarios-Q4_K_M.gguf" # Model's native context = 8192. With full-context rollout we keep as many # recent frames as fit; per-frame cost grows as the context fills (linear # prompt-eval per step), so deep rollouts get slower toward the end. N_CTX = 8192 N_THREADS = 2 CTX_MARGIN = 32 # tokens reserved beyond prompt + max_new # ----------------------------------------------------------------------------- # Prompt format (ported from PhysicsLLMEngine/browser_demo/src/promptFormat.ts) # ----------------------------------------------------------------------------- def _f(n: float, d: int) -> str: return f"{n:.{d}f}" def fmt_header(h: dict) -> str: lines = [f"Scene: {h.get('description', '')}"] g = h.get("gravity", {}) or {} lines.append(f"Gravity: ({g.get('x', 0)}, {g.get('y', 0)})") lines.append(f"Timestep: {_f(h.get('timestep', 0.01667), 5)}") if h.get("scenario_type"): lines.append(f"Type: {h['scenario_type']}") if h.get("difficulty") is not None: lines.append(f"Difficulty: {h['difficulty']}") parts = [] for sg in h.get("static_geometry", []) or []: if sg.get("type") == "segment": p1, p2 = sg["p1"], sg["p2"] parts.append( f"seg ({round(p1['x'])},{round(p1['y'])})-({round(p2['x'])},{round(p2['y'])})" ) elif sg.get("type") == "circle": c = sg["center"] parts.append(f"peg ({round(c['x'])},{round(c['y'])}) r={round(sg['radius'])}") if parts: lines.append("Static: " + "; ".join(parts)) constr = h.get("constraints", []) or [] if constr: lines.append( "Constraints: " + "; ".join(f"{c['type']} {c['body_a']}->{c['body_b']}" for c in constr) ) lines.append("") return "\n".join(lines) def fmt_frame(fr: dict) -> str: lines = [f"Frame {fr['frame']}: {fr.get('description', '')}"] for o in fr["objects"]: p = o["position"] v = o.get("velocity", {"x": 0, "y": 0}) or {"x": 0, "y": 0} a = o.get("angle", 0) or 0 av = o.get("angular_velocity", 0) or 0 s = f" obj_{o['id']}: pos=({_f(p['x'], 4)}, {_f(p['y'], 4)}), vel=({_f(v['x'], 4)}, {_f(v['y'], 4)})" if abs(a) > 0.001 or abs(av) > 0.001: s += f", a={_f(a, 4)}, av={_f(av, 4)}" lines.append(s) lines.append("") return "\n".join(lines) OBJ_RE = re.compile( r"obj_(\d+):\s*pos=\(\s*(-?[\d.]+)\s*,\s*(-?[\d.]+)\s*\),\s*vel=\(\s*(-?[\d.]+)\s*,\s*(-?[\d.]+)\s*\)" r"(?:,\s*a=(-?[\d.]+),\s*av=(-?[\d.]+))?" ) # Anchor on line start (allowing indent) so the "Frame N:" embedded inside a # frame's own description ("Frame 5: Frame 5: 7 of 11 …") is NOT treated as a # new frame boundary — only the real header at the start of a line is. FRAME_RE = re.compile(r"(?:^|\n)[ \t]*Frame\s+\d+:") def split_first_frame(text: str) -> str: m = list(FRAME_RE.finditer(text)) if not m: return text first_end = m[0].end() second_start = m[1].start() if len(m) > 1 else len(text) return text[first_end:second_start] def emitted_description(text: str) -> str: """Pull the model's own description for the frame it just emitted — the content after the first `Frame N:` on its starting line. Round-tripping this back into context (instead of a synthetic "simulation in progress") keeps the prompt in-distribution and avoids drift / lost collisions.""" m = re.search(r"^[ \t]*Frame\s+\d+:\s*(.*)", text, re.MULTILINE) return m.group(1).rstrip() if m else "" def parse_frame(text: str, n_obj: int) -> dict[int, dict]: out: dict[int, dict] = {} for m in OBJ_RE.finditer(text): i = int(m.group(1)) if i < n_obj: out[i] = { "id": i, "position": {"x": float(m.group(2)), "y": float(m.group(3))}, "velocity": {"x": float(m.group(4)), "y": float(m.group(5))}, "angle": float(m.group(6)) if m.group(6) else 0.0, "angular_velocity": float(m.group(7)) if m.group(7) else 0.0, } return out # ----------------------------------------------------------------------------- # Pymunk ground-truth rollout (the engine LFM2-scenarios was distilled from). # Generates a deterministic Pymunk simulation from the same starting state the # model gets, so we can render model vs Pymunk side-by-side per frame. # ----------------------------------------------------------------------------- def pymunk_rollout(header: dict, seed_frame: dict, n_frames: int) -> list[dict]: try: import pymunk except Exception as exc: # noqa: BLE001 print(f"[pymunk] unavailable: {exc}", flush=True) return [] g = header.get("gravity", {}) or {} dt = float(header.get("timestep") or (1.0 / 60.0)) space = pymunk.Space() space.gravity = (float(g.get("x", 0.0)), float(g.get("y", 0.0))) for sg in header.get("static_geometry", []) or []: if sg.get("type") == "segment": seg = pymunk.Segment( space.static_body, (sg["p1"]["x"], sg["p1"]["y"]), (sg["p2"]["x"], sg["p2"]["y"]), radius=1.0, ) seg.friction = float(sg.get("friction", 0.5)) seg.elasticity = float(sg.get("elasticity", 0.5)) space.add(seg) elif sg.get("type") == "circle": peg = pymunk.Circle( space.static_body, float(sg.get("radius", 4)), offset=(sg["center"]["x"], sg["center"]["y"]), ) peg.friction = float(sg.get("friction", 0.5)) peg.elasticity = float(sg.get("elasticity", 0.5)) space.add(peg) state_by_id = {o["id"]: o for o in (seed_frame.get("objects") or [])} bodies: dict[int, tuple] = {} for ho in header.get("objects", []) or []: oid = ho["id"] st = state_by_id.get(oid, {}) mat = ho.get("material", {}) or {} mass = float(mat.get("mass", 1.0)) if ho["type"] == "circle": r = float(ho.get("radius", 12)) moment = pymunk.moment_for_circle(mass, 0, r) body = pymunk.Body(mass, moment) shape = pymunk.Circle(body, r) else: w, h = float(ho.get("width", 20)), float(ho.get("height", 20)) moment = pymunk.moment_for_box(mass, (w, h)) body = pymunk.Body(mass, moment) shape = pymunk.Poly.create_box(body, (w, h)) pos = st.get("position") or ho.get("position") or {"x": 0, "y": 0} body.position = (float(pos.get("x", 0)), float(pos.get("y", 0))) v = st.get("velocity") or {"x": 0, "y": 0} body.velocity = (float(v.get("x", 0)), float(v.get("y", 0))) body.angle = float(st.get("angle", 0) or 0) body.angular_velocity = float(st.get("angular_velocity", 0) or 0) shape.friction = float(mat.get("friction", 0.5)) shape.elasticity = float(mat.get("elasticity", 0.4)) space.add(body, shape) bodies[oid] = (body, ho) start_idx = int(seed_frame.get("frame", 0)) frames: list[dict] = [] for i in range(1, n_frames + 1): space.step(dt) objs = [] for oid, (body, meta) in sorted(bodies.items()): objs.append({ "id": oid, "type": meta["type"], "position": {"x": float(body.position.x), "y": float(body.position.y)}, "velocity": {"x": float(body.velocity.x), "y": float(body.velocity.y)}, "angle": float(body.angle), "angular_velocity": float(body.angular_velocity), }) frames.append({"frame": start_idx + i, "description": f"Frame {start_idx+i}: pymunk", "objects": objs}) return frames # ----------------------------------------------------------------------------- # Scenarios # ----------------------------------------------------------------------------- def load_scenarios() -> dict[str, dict]: out: dict[str, dict] = {} for p in sorted(EXAMPLES_DIR.glob("*.jsonl")): try: lines = [ln for ln in p.read_text().splitlines() if ln.strip()] header = json.loads(lines[0]) frames = [json.loads(ln) for ln in lines[1:] if ln.startswith("{")] initial = frames[:4] # ground_truth = the full Pymunk rollout (the dataset this LFM2 was # distilled from). We render it side-by-side with the model's # rollout so divergence is visible frame-by-frame. out[p.stem] = { "header": header, "initial_frames": initial, "ground_truth": frames, } except Exception as exc: # noqa: BLE001 print(f"[scenarios] skip {p.name}: {exc}", flush=True) return out SCENARIOS = load_scenarios() HELD_OUT = {"pong", "bowling", "ramp_roll", "angry_birds", "hourglass", "newtons_cradle"} # Curated demos that look good in this setup (kept to scenes where the model # was trained and the rollout stays physically plausible for tens of frames). # `bowling` / `newtons_cradle` are held-out so they're more of a stress test # but they're iconic so we keep them. # Vetted by running each through the live API + checking model=N/N with no # 'held' (truncated) objects. Replaced 'dominos' (only emitted ~22/26 obj per # frame, 3 frozen each step) with 'pyramid' (28/28 clean). FEATURED = [s for s in ( "projectile", "pendulum", "billiards", "pyramid", "plinko", "orbit", "bowling", "newtons_cradle", ) if s in SCENARIOS] # ----------------------------------------------------------------------------- # Model (lazy) # ----------------------------------------------------------------------------- def get_llm(log=lambda s: None): # Built fresh each call: ZeroGPU frees the GPU between requests, so a cached # GPU-resident model would be stale. The GGUF stays disk-cached, so only the # (fast) load repeats. from huggingface_hub import hf_hub_download from llama_cpp import Llama log("Fetching model (≈216 MB, cached after first run)…") path = hf_hub_download(repo_id=GGUF_REPO, filename=GGUF_FILE) # NB: prompt-lookup speculative decoding (the practical stand-in for EAGLE, # which llama.cpp lacks) was tried but `draft_model` on this CUDA wheel # fails with `llama_decode returned -1` — both with and without flash_attn. # flash_attn alone works and is the win here (~+36% tok/s), so we use that # with a plain fallback. Each config is validated with a real warmup decode. base = dict(model_path=path, n_ctx=N_CTX, n_gpu_layers=-1, n_batch=1024, n_threads=N_THREADS, n_threads_batch=N_THREADS, verbose=False) warmup = "Frame 1:\n obj_0: pos=(1.0, 2.0), vel=(0.0, 0.0)\nFrame 2:" for name, kw in (("flash_attn", dict(base, flash_attn=True)), ("plain", dict(base))): try: log(f"Loading LFM2-350M (GPU) · backend: {name}…") llm = Llama(**kw) # Warmup forces the ~40s CUDA PTX-JIT here (uniform per-frame timing # later) and surfaces any decode-time incompatibility now. We also # bump the batch size when constructing to speed prompt-eval over # large contexts (n_batch=1024 is set in base; could go higher). llm.create_completion(warmup, max_tokens=8) log(f"Model ready · backend={name}") return llm except Exception as exc: # noqa: BLE001 log(f"backend {name} failed: {str(exc)[:90]}") raise RuntimeError("no working llama.cpp backend found") # ----------------------------------------------------------------------------- # Rendering # ----------------------------------------------------------------------------- BG = "#0b0f17" WALL = "#5b6677" PEG = "#8a93a6" PALETTE = ["#4ea1ff", "#ff7c5b", "#ffd166", "#06d6a0", "#c77dff", "#ff5dac", "#7ee787", "#f78166", "#79c0ff", "#d2a8ff"] def scene_bounds(header: dict) -> tuple[float, float, float, float]: xs, ys = [], [] for o in header.get("objects", []): xs.append(o["position"]["x"]) ys.append(o["position"]["y"]) for sg in header.get("static_geometry", []) or []: if sg.get("type") == "segment": xs += [sg["p1"]["x"], sg["p2"]["x"]] ys += [sg["p1"]["y"], sg["p2"]["y"]] elif sg.get("type") == "circle": xs.append(sg["center"]["x"]); ys.append(sg["center"]["y"]) if not xs: return 0, 800, 0, 600 pad = 40 return min(xs) - pad, max(xs) + pad, min(ys) - pad, max(ys) + pad def render(header: dict, obj_map: dict[int, dict], bounds, title: str) -> Image.Image: x0, x1, y0, y1 = bounds meta = {o["id"]: o for o in header.get("objects", [])} fig, ax = plt.subplots(figsize=(7.2, 5.4), dpi=100) fig.patch.set_facecolor(BG) ax.set_facecolor(BG) ax.set_xlim(x0, x1); ax.set_ylim(y0, y1) ax.set_aspect("equal"); ax.axis("off") for sg in header.get("static_geometry", []) or []: if sg.get("type") == "segment": ax.plot([sg["p1"]["x"], sg["p2"]["x"]], [sg["p1"]["y"], sg["p2"]["y"]], color=WALL, lw=3, solid_capstyle="round", zorder=1) elif sg.get("type") == "circle": ax.add_patch(Circle((sg["center"]["x"], sg["center"]["y"]), sg["radius"], color=PEG, zorder=1)) for oid, o in sorted(obj_map.items()): m = meta.get(oid, {}) p = o["position"] color = PALETTE[oid % len(PALETTE)] otype = m.get("type", "circle") if otype == "circle": r = m.get("radius", 12) ax.add_patch(Circle((p["x"], p["y"]), r, color=color, ec="white", lw=0.6, zorder=3)) else: w = m.get("width", 20); h = m.get("height", 20) ang = np.degrees(o.get("angle", 0) or 0) rect = Rectangle((p["x"] - w / 2, p["y"] - h / 2), w, h, color=color, ec="white", lw=0.6, zorder=3) t = (matplotlib.transforms.Affine2D() .rotate_deg_around(p["x"], p["y"], ang) + ax.transData) rect.set_transform(t) ax.add_patch(rect) ax.set_title(title, color="#c9d1d9", fontsize=11, loc="left", pad=8) fig.tight_layout(pad=0.5) buf = io.BytesIO() fig.savefig(buf, format="png", facecolor=BG) plt.close(fig) buf.seek(0) return Image.open(buf).convert("RGB") # ----------------------------------------------------------------------------- # Prompt fitting (full context — pack as many recent frames as fit in N_CTX) # ----------------------------------------------------------------------------- def _build_prompt(header: dict, frames: list[dict]) -> str: ctx = fmt_header(header) for fr in frames: ctx += fmt_frame(fr) return ctx + "Predict next frame:" def fit_prompt(llm, header: dict, all_frames: list[dict], max_new: int) -> tuple[str, int]: """Pack header + as many recent frames as fit into N_CTX - max_new - margin.""" encode = lambda s: llm.tokenize(s.encode("utf-8"), add_bos=True) # noqa: E731 budget_tokens = N_CTX - max_new - CTX_MARGIN keep = len(all_frames) while keep > 0: prompt = _build_prompt(header, all_frames[-keep:]) if len(encode(prompt)) <= budget_tokens: return prompt, keep keep -= 1 return _build_prompt(header, []), 0 # ----------------------------------------------------------------------------- # Scenario helpers (UI) # ----------------------------------------------------------------------------- def scene_to_json(name: str) -> str: sc = SCENARIOS.get(name) if not sc: return "{}" return json.dumps( {"header": sc["header"], "initial_frames": sc["initial_frames"]}, indent=2, ensure_ascii=False, ) # ----------------------------------------------------------------------------- # Interactive canvas editor (Konva-in-iframe; gradio doesn't sanitize iframe # srcdoc, so the JS reliably runs and can talk back to a hidden gr.Textbox). # ----------------------------------------------------------------------------- _EDITOR_IFRAME = r"""
drag · click empty to add · in Velocity mode drag the red dot
⏳ not synced
""" def editor_html(scene: dict) -> str: """Return a gr.HTML value: an iframe whose srcdoc contains the canvas editor with the scene baked in as a JS literal.""" import html as _html inner = _EDITOR_IFRAME.replace("__SCENE__", json.dumps(scene)) srcdoc = _html.escape(inner, quote=True) return ( f'' ) def scene_loaded(name: str) -> tuple[str, str]: """Scenario.change/Reset.click → (new editor HTML, new hidden state JSON).""" sc = SCENARIOS.get(name) or {"header": {}, "initial_frames": []} bundle = {"header": sc["header"], "initial_frames": sc["initial_frames"]} return editor_html(bundle), json.dumps(bundle) # ----------------------------------------------------------------------------- # Numerical evaluation: model rollout vs Pymunk ground truth (position MSE). # Exposed via the api_name="/evaluate" endpoint so we can benchmark featured # demos from a script without scraping the UI. # ----------------------------------------------------------------------------- @gpu(duration=300) def evaluate(scene_json: str, scenario_name: str, n_frames: int): bundle = json.loads(scene_json) header = bundle["header"] initial = bundle.get("initial_frames") or [] n_obj = (header.get("object_count") or len(header.get("objects", [])) or (len(initial[0]["objects"]) if initial else 0)) x0, x1, y0, y1 = scene_bounds(header) diag = ((x1 - x0) ** 2 + (y1 - y0) ** 2) ** 0.5 gt_frames = pymunk_rollout(header, initial[-1], int(n_frames)) gt_by_frame = {f["frame"]: f for f in gt_frames} from llama_cpp import Llama # noqa: F401 (preload may be required) llm = get_llm(lambda s: None) budget = int(min(2000, n_obj * 36 + 100)) rolled: list[dict] = list(initial) last_idx = initial[-1]["frame"] if initial else 0 per_frame: list[dict] = [] t0 = time.time() for _ in range(int(n_frames)): prompt, _ctx_frames = fit_prompt(llm, header, rolled, budget) next_idx = last_idx + 2 stops = [f"Frame {next_idx+d}:" for d in range(0, 4)] out = llm.create_completion(prompt, max_tokens=budget, temperature=0.0, top_p=0.95, stop=stops) text = out["choices"][0]["text"] parsed = parse_frame(split_first_frame(text), n_obj) modeled = len(parsed) prev_objs = {o["id"]: o for o in rolled[-1]["objects"]} if rolled else {} new_objs = dict(parsed) if parsed else dict(prev_objs) if modeled < n_obj: for oid, o in prev_objs.items(): new_objs.setdefault(oid, o) last_idx += 1 rolled.append({ "frame": last_idx, "description": emitted_description(text) or f"Frame {last_idx}: simulation in progress.", "objects": list(new_objs.values()), }) gt = gt_by_frame.get(last_idx) if gt: gt_pos = {o["id"]: o["position"] for o in gt["objects"]} errs = [] for oid, o in new_objs.items(): if oid in gt_pos: dx = gt_pos[oid]["x"] - o["position"]["x"] dy = gt_pos[oid]["y"] - o["position"]["y"] errs.append((dx * dx + dy * dy) ** 0.5) per_frame.append({ "frame": last_idx, "modeled": modeled, "mean_dist": (sum(errs) / len(errs)) if errs else None, "max_dist": max(errs) if errs else None, }) valid = [p for p in per_frame if p["mean_dist"] is not None] mean_dist = (sum(p["mean_dist"] for p in valid) / len(valid)) if valid else None return json.dumps({ "scenario": scenario_name, "n_obj": n_obj, "scene_diag": diag, "frames_done": len(per_frame), "frames_held_avg": sum(n_obj - p["modeled"] for p in per_frame) / max(1, len(per_frame)), "mean_dist": mean_dist, "mean_dist_pct_diag": (mean_dist / diag * 100.0) if (mean_dist and diag) else None, "elapsed": round(time.time() - t0, 2), "per_frame": per_frame, }) # ----------------------------------------------------------------------------- # Simulation (streamed) # ----------------------------------------------------------------------------- @gpu(duration=300) def simulate(scene_json: str, scenario_name: str, n_frames: int, temperature: float): log_lines: list[str] = [] def log(s: str): log_lines.append(s) print("[sim]", s, flush=True) try: bundle = json.loads(scene_json) header = bundle["header"] initial = bundle.get("initial_frames") or [] except Exception as exc: # noqa: BLE001 yield None, None, None, f"Scene JSON parse error: {exc}" return n_obj = (header.get("object_count") or len(header.get("objects", [])) or (len(initial[0]["objects"]) if initial else 0)) bounds = scene_bounds(header) title = header.get("scenario_type") or header.get("description", "scene")[:32] log(f"Scene: {title} · {n_obj} objects · {len(initial)} seed frames") # Pymunk ground truth — generated fresh from THE ACTUAL scene state (same # initial conditions the model gets), so the side-by-side comparison is # apples-to-apples even after the user edits the canvas. gt_by_frame: dict[int, dict] = {} if initial: gt_n = int(n_frames) + 1 t_gt = time.time() try: gt_frames = pymunk_rollout(header, initial[-1], gt_n) for f in gt_frames: gt_by_frame[f["frame"]] = f log(f"Pymunk ground truth: {len(gt_by_frame)} frames in {time.time()-t_gt:.2f}s") except Exception as exc: # noqa: BLE001 log(f"Pymunk rollout failed ({exc}); GT panel disabled") rolled_frames: list[dict] = list(initial) last_idx = initial[-1]["frame"] if initial else 0 gif_frames: list[Image.Image] = [] def render_model() -> Image.Image: last = rolled_frames[-1] if rolled_frames else {"objects": []} obj_map = {o["id"]: o for o in last.get("objects", [])} return render(header, obj_map, bounds, f"Model · frame {last_idx}") def render_truth() -> Image.Image | None: gt = gt_by_frame.get(last_idx) if not gt: return None obj_map = {o["id"]: o for o in gt.get("objects", [])} return render(header, obj_map, bounds, f"Pymunk · frame {last_idx}") if rolled_frames: gif_frames.append(render_model()) yield gif_frames[-1], render_truth(), None, "Loading model…\n" + "\n".join(log_lines[-12:]) try: llm = get_llm(log) except Exception as exc: # noqa: BLE001 yield (gif_frames[-1] if gif_frames else None), None, None, f"Model load failed: {exc}" return # ≈ one object line per ~28 tokens (with angle/av fields). For dense # scenes (30-40 objects) we need ~1300+ tokens for a single complete # frame; a tight cap silently truncates the tail and the parser then # holds those objects from the prior frame (user sees "frozen balls"). # Budget linearly with n_obj; cap so we don't waste GPU on huge scenes. budget = int(min(2000, n_obj * 36 + 100)) t0 = time.time() for step in range(int(n_frames)): prompt, ctx_frames = fit_prompt(llm, header, rolled_frames, budget) # Dynamic stop: the model emits "Frame K: ...\n obj_…\nFrame K+1:…". # Stopping as soon as the *next* frame header starts saves the wasted # generation that bleeds into frame K+1. Cover a few likely indices in # case the model drifts by ±1. next_idx = last_idx + 2 stops = [f"Frame {next_idx+d}:" for d in range(0, 4)] try: out = llm.create_completion( prompt, max_tokens=budget, temperature=max(0.0, float(temperature)), top_p=0.95, stop=stops, ) text = out["choices"][0]["text"] except Exception: # noqa: BLE001 import traceback tb = traceback.format_exc() log(f"generation error at step {step+1}") yield (gif_frames[-1] if gif_frames else None), None, None, "GENERATION ERROR:\n" + tb[-1500:] return parsed = parse_frame(split_first_frame(text), n_obj) modeled = len(parsed) new_objs = dict(parsed) prev_objs = {o["id"]: o for o in rolled_frames[-1]["objects"]} if rolled_frames else {} if not new_objs: new_objs = dict(prev_objs) elif modeled < n_obj: for oid, o in prev_objs.items(): new_objs.setdefault(oid, o) last_idx += 1 emitted = emitted_description(text) # Training format: description literally starts with "Frame N:" — keep # the model's own emitted text so the round-tripped context matches. desc = emitted or f"Frame {last_idx}: simulation in progress." rolled_frames.append({ "frame": last_idx, "description": desc, "objects": list(new_objs.values()), }) elapsed = time.time() - t0 fps = (step + 1) / max(elapsed, 1e-3) # Show modeled vs final separately: if modeled < n_obj, some objects # were held from the previous frame (model didn't get to emit them). held = n_obj - modeled held_note = f" ({held} held)" if held > 0 else "" log(f"step {step+1}/{int(n_frames)}: model={modeled}/{n_obj}{held_note} · ctx={ctx_frames}f · {elapsed:.1f}s · {fps:.2f} frame/s") gif_frames.append(render_model()) status = f"Simulating… frame {step+1}/{int(n_frames)}\n" + "\n".join(log_lines[-12:]) yield gif_frames[-1], render_truth(), None, status gif_path = None if len(gif_frames) > 1: gif_path = str(HERE / "rollout.gif") gif_frames[0].save(gif_path, save_all=True, append_images=gif_frames[1:], duration=60, loop=0) log(f"Done — {len(gif_frames)} frames in {time.time()-t0:.1f}s") yield gif_frames[-1], render_truth(), gif_path, "Done.\n" + "\n".join(log_lines[-12:]) # ----------------------------------------------------------------------------- # UI # ----------------------------------------------------------------------------- _DEFAULT = "bowling" if "bowling" in SCENARIOS else (sorted(SCENARIOS)[0] if SCENARIOS else None) _DEFAULT_BUNDLE = { "header": SCENARIOS[_DEFAULT]["header"], "initial_frames": SCENARIOS[_DEFAULT]["initial_frames"], } if _DEFAULT else {"header": {}, "initial_frames": []} with gr.Blocks(title="Physics LLM 🪀") as demo: gr.Markdown( "# Physics LLM 🪀\n" "A fine-tuned **LFM2-350M** predicts 2D rigid-body physics frame-by-frame, " "as text — no physics engine. Pick a preset, **drag the balls around or " "drop new ones** on the canvas, then watch the model roll it forward live. " "The model sees as much of the prior trajectory as fits its 8192-token " "context.\n\n" "Six scenarios (`pong`, `bowling`, `ramp_roll`, `angry_birds`, `hourglass`, " "`newtons_cradle`) were **never seen in training**." ) gr.Markdown( "**✨ Featured demos** (model handles these cleanly) — click to load:" ) featured = gr.Radio( choices=FEATURED, value="bowling" if "bowling" in FEATURED else (FEATURED[0] if FEATURED else None), label="", show_label=False, ) with gr.Row(): scenario = gr.Dropdown( choices=sorted(SCENARIOS.keys()), value=_DEFAULT, label="All 30 scenarios", scale=4, ) reset = gr.Button("Reset to preset", scale=1) scene_html = gr.HTML(value=editor_html(_DEFAULT_BUNDLE), sanitize_html=False) scene_state = gr.Textbox( value=json.dumps(_DEFAULT_BUNDLE), lines=2, max_lines=4, label="Scene state (auto-synced from canvas; Simulate reads this)", elem_id="ph-scene-state", ) with gr.Row(): with gr.Column(scale=1): n_frames = gr.Slider(5, 200, value=60, step=1, label="Frames to predict") temperature = gr.Slider(0.0, 1.0, value=0.0, step=0.05, label="Temperature (0 = greedy)") run = gr.Button("▶ Simulate", variant="primary") gif = gr.Image(label="Replay (animated, model)", type="filepath", height=200) with gr.Column(scale=3): with gr.Row(): view = gr.Image(label="Model prediction", height=380) view_truth = gr.Image(label="Pymunk ground truth (distilled from)", height=380) status = gr.Textbox(label="Log", lines=12, max_lines=12) # Featured radio mirrors the dropdown + repaints the editor + state. def _pick_featured(name): html, state = scene_loaded(name) return name, html, state featured.change(_pick_featured, [featured], [scenario, scene_html, scene_state]) scenario.change(scene_loaded, [scenario], [scene_html, scene_state]) reset.click(scene_loaded, [scenario], [scene_html, scene_state]) run.click(simulate, [scene_state, scenario, n_frames, temperature], [view, view_truth, gif, status]) with gr.Accordion("📊 Compute position MSE vs Pymunk (numerical)", open=False): with gr.Row(): eval_frames = gr.Slider(5, 30, value=15, step=1, label="Frames to evaluate") eval_btn = gr.Button("Run evaluation", scale=1) eval_out = gr.Code(language="json", lines=12, label="Result") eval_btn.click(evaluate, [scene_state, scenario, eval_frames], [eval_out]) if __name__ == "__main__": demo.launch()