"""Physics LLM — a Gradio-native demo. Pick a physics scenario; a fine-tuned `LFM2-350M` (GGUF, Q4_K_M, run on CPU via llama-cpp-python) autoregressively predicts the next frames in a structured text format. Each predicted frame is parsed and rendered to a 2D canvas with matplotlib, streamed live as the rollout proceeds. The model and prompt format follow AlexWortega/PhysicsLLMEngine: the prompt is the scene header + a few seed frames + "Predict next frame:", and the model continues with "Frame N: …\n obj_i: pos=(x,y), vel=(vx,vy), a=…, av=…". """ from __future__ import annotations import glob import io import json import os import re import time from pathlib import Path def _preload_cuda() -> None: """ZeroGPU: the CUDA build of llama-cpp-python needs libcudart/libcublas on the loader path at import time, but they aren't there by default. The pip nvidia-* packages ship the .so's; preload them globally so `import llama_cpp` succeeds. No-op off GPU / when the packages are absent.""" import ctypes try: import nvidia # noqa: F401 base = os.path.dirname(nvidia.__file__) except Exception: return for sub in ("cuda_runtime", "cublas"): for so in sorted(glob.glob(os.path.join(base, sub, "lib", "*.so*"))): try: ctypes.CDLL(so, mode=ctypes.RTLD_GLOBAL) except OSError: pass _preload_cuda() # Persist CUDA's PTX-JIT cache across requests (ZeroGPU frees the GPU between # calls but the container/filesystem persist) so only the first cold request # pays the ~40s kernel-compile cost. os.environ.setdefault("CUDA_CACHE_PATH", "/tmp/cuda_jit_cache") os.environ.setdefault("CUDA_CACHE_MAXSIZE", str(2 * 1024 * 1024 * 1024)) try: import spaces gpu = spaces.GPU except Exception: # local / non-Spaces: make @gpu(...) a no-op def gpu(*args, **kwargs): if len(args) == 1 and callable(args[0]) and not kwargs: return args[0] return lambda f: f import gradio as gr import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt from matplotlib.patches import Circle, Rectangle import numpy as np from PIL import Image HERE = Path(__file__).parent EXAMPLES_DIR = HERE / "backend" / "examples" GGUF_REPO = "AlexWortega/lfm2-scenarios-GGUF" GGUF_FILE = "lfm2-scenarios-Q4_K_M.gguf" # Model's native context = 8192. With full-context rollout we keep as many # recent frames as fit; per-frame cost grows as the context fills (linear # prompt-eval per step), so deep rollouts get slower toward the end. N_CTX = 8192 N_THREADS = 2 CTX_MARGIN = 32 # tokens reserved beyond prompt + max_new # ----------------------------------------------------------------------------- # Prompt format (ported from PhysicsLLMEngine/browser_demo/src/promptFormat.ts) # ----------------------------------------------------------------------------- def _f(n: float, d: int) -> str: return f"{n:.{d}f}" def fmt_header(h: dict) -> str: lines = [f"Scene: {h.get('description', '')}"] g = h.get("gravity", {}) or {} lines.append(f"Gravity: ({g.get('x', 0)}, {g.get('y', 0)})") lines.append(f"Timestep: {_f(h.get('timestep', 0.01667), 5)}") if h.get("scenario_type"): lines.append(f"Type: {h['scenario_type']}") if h.get("difficulty") is not None: lines.append(f"Difficulty: {h['difficulty']}") parts = [] for sg in h.get("static_geometry", []) or []: if sg.get("type") == "segment": p1, p2 = sg["p1"], sg["p2"] parts.append( f"seg ({round(p1['x'])},{round(p1['y'])})-({round(p2['x'])},{round(p2['y'])})" ) elif sg.get("type") == "circle": c = sg["center"] parts.append(f"peg ({round(c['x'])},{round(c['y'])}) r={round(sg['radius'])}") if parts: lines.append("Static: " + "; ".join(parts)) constr = h.get("constraints", []) or [] if constr: lines.append( "Constraints: " + "; ".join(f"{c['type']} {c['body_a']}->{c['body_b']}" for c in constr) ) lines.append("") return "\n".join(lines) def fmt_frame(fr: dict) -> str: lines = [f"Frame {fr['frame']}: {fr.get('description', '')}"] for o in fr["objects"]: p = o["position"] v = o.get("velocity", {"x": 0, "y": 0}) or {"x": 0, "y": 0} a = o.get("angle", 0) or 0 av = o.get("angular_velocity", 0) or 0 s = f" obj_{o['id']}: pos=({_f(p['x'], 4)}, {_f(p['y'], 4)}), vel=({_f(v['x'], 4)}, {_f(v['y'], 4)})" if abs(a) > 0.001 or abs(av) > 0.001: s += f", a={_f(a, 4)}, av={_f(av, 4)}" lines.append(s) lines.append("") return "\n".join(lines) OBJ_RE = re.compile( r"obj_(\d+):\s*pos=\(\s*(-?[\d.]+)\s*,\s*(-?[\d.]+)\s*\),\s*vel=\(\s*(-?[\d.]+)\s*,\s*(-?[\d.]+)\s*\)" r"(?:,\s*a=(-?[\d.]+),\s*av=(-?[\d.]+))?" ) # Anchor on line start (allowing indent) so the "Frame N:" embedded inside a # frame's own description ("Frame 5: Frame 5: 7 of 11 …") is NOT treated as a # new frame boundary — only the real header at the start of a line is. FRAME_RE = re.compile(r"(?:^|\n)[ \t]*Frame\s+\d+:") def split_first_frame(text: str) -> str: m = list(FRAME_RE.finditer(text)) if not m: return text first_end = m[0].end() second_start = m[1].start() if len(m) > 1 else len(text) return text[first_end:second_start] def emitted_description(text: str) -> str: """Pull the model's own description for the frame it just emitted — the content after the first `Frame N:` on its starting line. Round-tripping this back into context (instead of a synthetic "simulation in progress") keeps the prompt in-distribution and avoids drift / lost collisions.""" m = re.search(r"^[ \t]*Frame\s+\d+:\s*(.*)", text, re.MULTILINE) return m.group(1).rstrip() if m else "" def parse_frame(text: str, n_obj: int) -> dict[int, dict]: out: dict[int, dict] = {} for m in OBJ_RE.finditer(text): i = int(m.group(1)) if i < n_obj: out[i] = { "id": i, "position": {"x": float(m.group(2)), "y": float(m.group(3))}, "velocity": {"x": float(m.group(4)), "y": float(m.group(5))}, "angle": float(m.group(6)) if m.group(6) else 0.0, "angular_velocity": float(m.group(7)) if m.group(7) else 0.0, } return out # ----------------------------------------------------------------------------- # Pymunk ground-truth rollout (the engine LFM2-scenarios was distilled from). # Generates a deterministic Pymunk simulation from the same starting state the # model gets, so we can render model vs Pymunk side-by-side per frame. # ----------------------------------------------------------------------------- def pymunk_rollout(header: dict, seed_frame: dict, n_frames: int) -> list[dict]: try: import pymunk except Exception as exc: # noqa: BLE001 print(f"[pymunk] unavailable: {exc}", flush=True) return [] g = header.get("gravity", {}) or {} dt = float(header.get("timestep") or (1.0 / 60.0)) space = pymunk.Space() space.gravity = (float(g.get("x", 0.0)), float(g.get("y", 0.0))) for sg in header.get("static_geometry", []) or []: if sg.get("type") == "segment": seg = pymunk.Segment( space.static_body, (sg["p1"]["x"], sg["p1"]["y"]), (sg["p2"]["x"], sg["p2"]["y"]), radius=1.0, ) seg.friction = float(sg.get("friction", 0.5)) seg.elasticity = float(sg.get("elasticity", 0.5)) space.add(seg) elif sg.get("type") == "circle": peg = pymunk.Circle( space.static_body, float(sg.get("radius", 4)), offset=(sg["center"]["x"], sg["center"]["y"]), ) peg.friction = float(sg.get("friction", 0.5)) peg.elasticity = float(sg.get("elasticity", 0.5)) space.add(peg) state_by_id = {o["id"]: o for o in (seed_frame.get("objects") or [])} bodies: dict[int, tuple] = {} for ho in header.get("objects", []) or []: oid = ho["id"] st = state_by_id.get(oid, {}) mat = ho.get("material", {}) or {} mass = float(mat.get("mass", 1.0)) if ho["type"] == "circle": r = float(ho.get("radius", 12)) moment = pymunk.moment_for_circle(mass, 0, r) body = pymunk.Body(mass, moment) shape = pymunk.Circle(body, r) else: w, h = float(ho.get("width", 20)), float(ho.get("height", 20)) moment = pymunk.moment_for_box(mass, (w, h)) body = pymunk.Body(mass, moment) shape = pymunk.Poly.create_box(body, (w, h)) pos = st.get("position") or ho.get("position") or {"x": 0, "y": 0} body.position = (float(pos.get("x", 0)), float(pos.get("y", 0))) v = st.get("velocity") or {"x": 0, "y": 0} body.velocity = (float(v.get("x", 0)), float(v.get("y", 0))) body.angle = float(st.get("angle", 0) or 0) body.angular_velocity = float(st.get("angular_velocity", 0) or 0) shape.friction = float(mat.get("friction", 0.5)) shape.elasticity = float(mat.get("elasticity", 0.4)) space.add(body, shape) bodies[oid] = (body, ho) start_idx = int(seed_frame.get("frame", 0)) frames: list[dict] = [] for i in range(1, n_frames + 1): space.step(dt) objs = [] for oid, (body, meta) in sorted(bodies.items()): objs.append({ "id": oid, "type": meta["type"], "position": {"x": float(body.position.x), "y": float(body.position.y)}, "velocity": {"x": float(body.velocity.x), "y": float(body.velocity.y)}, "angle": float(body.angle), "angular_velocity": float(body.angular_velocity), }) frames.append({"frame": start_idx + i, "description": f"Frame {start_idx+i}: pymunk", "objects": objs}) return frames # ----------------------------------------------------------------------------- # Scenarios # ----------------------------------------------------------------------------- def load_scenarios() -> dict[str, dict]: out: dict[str, dict] = {} for p in sorted(EXAMPLES_DIR.glob("*.jsonl")): try: lines = [ln for ln in p.read_text().splitlines() if ln.strip()] header = json.loads(lines[0]) frames = [json.loads(ln) for ln in lines[1:] if ln.startswith("{")] initial = frames[:4] # ground_truth = the full Pymunk rollout (the dataset this LFM2 was # distilled from). We render it side-by-side with the model's # rollout so divergence is visible frame-by-frame. out[p.stem] = { "header": header, "initial_frames": initial, "ground_truth": frames, } except Exception as exc: # noqa: BLE001 print(f"[scenarios] skip {p.name}: {exc}", flush=True) return out SCENARIOS = load_scenarios() HELD_OUT = {"pong", "bowling", "ramp_roll", "angry_birds", "hourglass", "newtons_cradle"} # Curated demos that look good in this setup (kept to scenes where the model # was trained and the rollout stays physically plausible for tens of frames). # `bowling` / `newtons_cradle` are held-out so they're more of a stress test # but they're iconic so we keep them. # Vetted by running each through the live API + checking model=N/N with no # 'held' (truncated) objects. Replaced 'dominos' (only emitted ~22/26 obj per # frame, 3 frozen each step) with 'pyramid' (28/28 clean). FEATURED = [s for s in ( "projectile", "pendulum", "billiards", "pyramid", "plinko", "orbit", "bowling", "newtons_cradle", ) if s in SCENARIOS] # ----------------------------------------------------------------------------- # Model (lazy) # ----------------------------------------------------------------------------- def get_llm(log=lambda s: None): # Built fresh each call: ZeroGPU frees the GPU between requests, so a cached # GPU-resident model would be stale. The GGUF stays disk-cached, so only the # (fast) load repeats. from huggingface_hub import hf_hub_download from llama_cpp import Llama log("Fetching model (≈216 MB, cached after first run)…") path = hf_hub_download(repo_id=GGUF_REPO, filename=GGUF_FILE) # NB: prompt-lookup speculative decoding (the practical stand-in for EAGLE, # which llama.cpp lacks) was tried but `draft_model` on this CUDA wheel # fails with `llama_decode returned -1` — both with and without flash_attn. # flash_attn alone works and is the win here (~+36% tok/s), so we use that # with a plain fallback. Each config is validated with a real warmup decode. base = dict(model_path=path, n_ctx=N_CTX, n_gpu_layers=-1, n_batch=1024, n_threads=N_THREADS, n_threads_batch=N_THREADS, verbose=False) warmup = "Frame 1:\n obj_0: pos=(1.0, 2.0), vel=(0.0, 0.0)\nFrame 2:" for name, kw in (("flash_attn", dict(base, flash_attn=True)), ("plain", dict(base))): try: log(f"Loading LFM2-350M (GPU) · backend: {name}…") llm = Llama(**kw) # Warmup forces the ~40s CUDA PTX-JIT here (uniform per-frame timing # later) and surfaces any decode-time incompatibility now. We also # bump the batch size when constructing to speed prompt-eval over # large contexts (n_batch=1024 is set in base; could go higher). llm.create_completion(warmup, max_tokens=8) log(f"Model ready · backend={name}") return llm except Exception as exc: # noqa: BLE001 log(f"backend {name} failed: {str(exc)[:90]}") raise RuntimeError("no working llama.cpp backend found") # ----------------------------------------------------------------------------- # Rendering # ----------------------------------------------------------------------------- BG = "#0b0f17" WALL = "#5b6677" PEG = "#8a93a6" PALETTE = ["#4ea1ff", "#ff7c5b", "#ffd166", "#06d6a0", "#c77dff", "#ff5dac", "#7ee787", "#f78166", "#79c0ff", "#d2a8ff"] def scene_bounds(header: dict) -> tuple[float, float, float, float]: xs, ys = [], [] for o in header.get("objects", []): xs.append(o["position"]["x"]) ys.append(o["position"]["y"]) for sg in header.get("static_geometry", []) or []: if sg.get("type") == "segment": xs += [sg["p1"]["x"], sg["p2"]["x"]] ys += [sg["p1"]["y"], sg["p2"]["y"]] elif sg.get("type") == "circle": xs.append(sg["center"]["x"]); ys.append(sg["center"]["y"]) if not xs: return 0, 800, 0, 600 pad = 40 return min(xs) - pad, max(xs) + pad, min(ys) - pad, max(ys) + pad def render(header: dict, obj_map: dict[int, dict], bounds, title: str) -> Image.Image: x0, x1, y0, y1 = bounds meta = {o["id"]: o for o in header.get("objects", [])} fig, ax = plt.subplots(figsize=(7.2, 5.4), dpi=100) fig.patch.set_facecolor(BG) ax.set_facecolor(BG) ax.set_xlim(x0, x1); ax.set_ylim(y0, y1) ax.set_aspect("equal"); ax.axis("off") for sg in header.get("static_geometry", []) or []: if sg.get("type") == "segment": ax.plot([sg["p1"]["x"], sg["p2"]["x"]], [sg["p1"]["y"], sg["p2"]["y"]], color=WALL, lw=3, solid_capstyle="round", zorder=1) elif sg.get("type") == "circle": ax.add_patch(Circle((sg["center"]["x"], sg["center"]["y"]), sg["radius"], color=PEG, zorder=1)) for oid, o in sorted(obj_map.items()): m = meta.get(oid, {}) p = o["position"] color = PALETTE[oid % len(PALETTE)] otype = m.get("type", "circle") if otype == "circle": r = m.get("radius", 12) ax.add_patch(Circle((p["x"], p["y"]), r, color=color, ec="white", lw=0.6, zorder=3)) else: w = m.get("width", 20); h = m.get("height", 20) ang = np.degrees(o.get("angle", 0) or 0) rect = Rectangle((p["x"] - w / 2, p["y"] - h / 2), w, h, color=color, ec="white", lw=0.6, zorder=3) t = (matplotlib.transforms.Affine2D() .rotate_deg_around(p["x"], p["y"], ang) + ax.transData) rect.set_transform(t) ax.add_patch(rect) ax.set_title(title, color="#c9d1d9", fontsize=11, loc="left", pad=8) fig.tight_layout(pad=0.5) buf = io.BytesIO() fig.savefig(buf, format="png", facecolor=BG) plt.close(fig) buf.seek(0) return Image.open(buf).convert("RGB") # ----------------------------------------------------------------------------- # Prompt fitting (full context — pack as many recent frames as fit in N_CTX) # ----------------------------------------------------------------------------- def _build_prompt(header: dict, frames: list[dict]) -> str: ctx = fmt_header(header) for fr in frames: ctx += fmt_frame(fr) return ctx + "Predict next frame:" def fit_prompt(llm, header: dict, all_frames: list[dict], max_new: int) -> tuple[str, int]: """Pack header + as many recent frames as fit into N_CTX - max_new - margin.""" encode = lambda s: llm.tokenize(s.encode("utf-8"), add_bos=True) # noqa: E731 budget_tokens = N_CTX - max_new - CTX_MARGIN keep = len(all_frames) while keep > 0: prompt = _build_prompt(header, all_frames[-keep:]) if len(encode(prompt)) <= budget_tokens: return prompt, keep keep -= 1 return _build_prompt(header, []), 0 # ----------------------------------------------------------------------------- # Scenario helpers (UI) # ----------------------------------------------------------------------------- def scene_to_json(name: str) -> str: sc = SCENARIOS.get(name) if not sc: return "{}" return json.dumps( {"header": sc["header"], "initial_frames": sc["initial_frames"]}, indent=2, ensure_ascii=False, ) # ----------------------------------------------------------------------------- # Interactive canvas editor (Konva-in-iframe; gradio doesn't sanitize iframe # srcdoc, so the JS reliably runs and can talk back to a hidden gr.Textbox). # ----------------------------------------------------------------------------- _EDITOR_IFRAME = r"""