demo / app.py
github-actions[bot]
Sync from GitHub: f981b5b319b104ae2ede1bb5a4b163e5d21c968a
7e62de3
"""WorldFlux imagination demo powered by actual WorldFlux model inference."""
from __future__ import annotations
from functools import lru_cache
from pathlib import Path
from typing import Any
import gradio as gr
import numpy as np
import torch
from worldflux import create_world_model
MODEL_SPECS = {
"DreamerV3": {
"model_id": "dreamerv3:size12m",
"obs_shape": (3, 64, 64),
"action_dim": 6,
},
"TD-MPC2": {
"model_id": "tdmpc2:5m",
"obs_shape": (39,),
"action_dim": 6,
},
}
def _to_numpy_frame(tensor: torch.Tensor) -> np.ndarray | None:
value = tensor.detach().cpu()
if value.ndim == 4:
value = value[0]
if value.ndim == 3:
frame = value.numpy()
if frame.shape[0] in {1, 3}:
frame = np.transpose(frame, (1, 2, 0))
frame = np.nan_to_num(frame.astype(np.float32))
if frame.shape[-1] == 1:
frame = np.repeat(frame, 3, axis=-1)
if frame.max() > frame.min():
frame = (frame - frame.min()) / (frame.max() - frame.min())
return frame
return None
def _extract_predictions(decoded: Any) -> dict[str, torch.Tensor]:
if isinstance(decoded, dict):
return decoded
predictions = getattr(decoded, "predictions", {})
if isinstance(predictions, dict):
return predictions
return {}
@lru_cache(maxsize=8)
def _load_model(model_type: str, checkpoint_path: str) -> Any:
spec = MODEL_SPECS[model_type]
model = create_world_model(
spec["model_id"],
obs_shape=spec["obs_shape"],
action_dim=spec["action_dim"],
)
resolved_checkpoint = Path(checkpoint_path).expanduser() if checkpoint_path else None
if resolved_checkpoint and resolved_checkpoint.exists():
model = model.__class__.from_pretrained(str(resolved_checkpoint))
model.eval()
return model
def _build_initial_obs(model_type: str) -> torch.Tensor:
spec = MODEL_SPECS[model_type]
obs_shape = spec["obs_shape"]
if len(obs_shape) == 3:
channels, height, width = obs_shape
grid = np.linspace(0.0, 1.0, height * width, dtype=np.float32).reshape(height, width)
obs = np.stack([(grid + i / max(1, channels)) % 1.0 for i in range(channels)], axis=0)
return torch.from_numpy(obs).unsqueeze(0)
if len(obs_shape) == 1:
vector = np.linspace(-1.0, 1.0, obs_shape[0], dtype=np.float32)
return torch.from_numpy(vector).unsqueeze(0)
raise ValueError(f"Unsupported observation shape: {obs_shape}")
def _build_action_sequence(model_type: str, horizon: int, device: torch.device) -> torch.Tensor:
action_dim = MODEL_SPECS[model_type]["action_dim"]
actions = torch.zeros(horizon, 1, action_dim, device=device)
for t in range(horizon):
actions[t, 0, t % action_dim] = 1.0
return actions
def _plot_rewards(rewards: list[float]):
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(8, 4))
ax.plot(rewards, marker="o")
ax.set_xlabel("Time Step")
ax.set_ylabel("Predicted Reward")
ax.set_title("Imagined Rewards")
ax.grid(True, alpha=0.3)
return fig
def _plot_continues(continues: list[float]):
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(8, 4))
ax.plot(continues, marker="s", color="green")
ax.set_xlabel("Time Step")
ax.set_ylabel("Continue Probability")
ax.set_title("Episode Continuation")
ax.grid(True, alpha=0.3)
ax.set_ylim([0, 1])
return fig
def _plot_frames(frames: list[np.ndarray]):
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(8, 4))
if not frames:
ax.text(
0.5,
0.5,
"This model does not decode image observations.\n(Reward/continue are real model outputs)",
ha="center",
va="center",
fontsize=10,
)
ax.axis("off")
return fig
preview = np.concatenate(frames[: min(5, len(frames))], axis=1)
ax.imshow(preview)
ax.set_title("Imagined Frame Preview")
ax.axis("off")
return fig
def run_imagination(model_type: str, horizon: int, checkpoint_path: str):
model = _load_model(model_type, checkpoint_path.strip())
device = next(model.parameters()).device
initial_obs = _build_initial_obs(model_type).to(device=device)
action_sequence = _build_action_sequence(model_type, int(horizon), device)
rewards: list[float] = []
continues: list[float] = []
frames: list[np.ndarray] = []
with torch.no_grad():
state = model.encode({"obs": initial_obs})
trajectory = model.rollout(state, action_sequence)
rollout_rewards = getattr(trajectory, "rewards", None)
if isinstance(rollout_rewards, torch.Tensor):
rewards = rollout_rewards.detach().cpu().view(-1).tolist()
rollout_continues = getattr(trajectory, "continues", None)
if isinstance(rollout_continues, torch.Tensor):
continues = torch.sigmoid(rollout_continues).detach().cpu().view(-1).tolist()
for state_t in trajectory.states[1:]:
decoded = model.decode(state_t)
predictions = _extract_predictions(decoded)
if not rewards and isinstance(predictions.get("reward"), torch.Tensor):
rewards.append(float(predictions["reward"].detach().cpu().view(-1)[0]))
if not continues and isinstance(predictions.get("continue"), torch.Tensor):
continues.append(
float(torch.sigmoid(predictions["continue"]).detach().cpu().view(-1)[0])
)
obs_pred = predictions.get("obs")
if isinstance(obs_pred, torch.Tensor):
frame = _to_numpy_frame(obs_pred)
if frame is not None:
frames.append(frame)
if not rewards:
rewards = [0.0 for _ in range(int(horizon))]
if not continues:
continues = [1.0 for _ in range(int(horizon))]
rewards_plot = _plot_rewards(rewards)
continues_plot = _plot_continues(continues)
frames_plot = _plot_frames(frames)
status = (
f"Ran {model_type} inference for {int(horizon)} steps "
f"(checkpoint={checkpoint_path.strip() or 'model preset'})"
)
return rewards_plot, continues_plot, frames_plot, status
with gr.Blocks() as demo:
gr.Markdown("# WorldFlux Demo")
gr.Markdown("Actual WorldFlux encode → rollout → decode inference (no random mock outputs)")
model_type = gr.Dropdown(
choices=["DreamerV3", "TD-MPC2"], value="DreamerV3", label="Model Type"
)
checkpoint_path = gr.Textbox(
label="Checkpoint Path (optional)",
placeholder="/data/checkpoints/dreamer_final",
)
horizon = gr.Slider(5, 50, value=15, step=1, label="Imagination Horizon")
btn = gr.Button("Run Imagination")
with gr.Row():
rewards_plot = gr.Plot(label="Rewards")
continues_plot = gr.Plot(label="Continues")
frames_plot = gr.Plot(label="Imagined Frames")
output_text = gr.Textbox(label="Status")
btn.click(
run_imagination,
inputs=[model_type, horizon, checkpoint_path],
outputs=[rewards_plot, continues_plot, frames_plot, output_text],
)
if __name__ == "__main__":
demo.launch()