| | """WorldFlux imagination demo powered by actual WorldFlux model inference.""" |
| |
|
| | from __future__ import annotations |
| |
|
| | from functools import lru_cache |
| | from pathlib import Path |
| | from typing import Any |
| |
|
| | import gradio as gr |
| | import numpy as np |
| | import torch |
| |
|
| | from worldflux import create_world_model |
| |
|
| | MODEL_SPECS = { |
| | "DreamerV3": { |
| | "model_id": "dreamerv3:size12m", |
| | "obs_shape": (3, 64, 64), |
| | "action_dim": 6, |
| | }, |
| | "TD-MPC2": { |
| | "model_id": "tdmpc2:5m", |
| | "obs_shape": (39,), |
| | "action_dim": 6, |
| | }, |
| | } |
| |
|
| |
|
| | def _to_numpy_frame(tensor: torch.Tensor) -> np.ndarray | None: |
| | value = tensor.detach().cpu() |
| | if value.ndim == 4: |
| | value = value[0] |
| | if value.ndim == 3: |
| | frame = value.numpy() |
| | if frame.shape[0] in {1, 3}: |
| | frame = np.transpose(frame, (1, 2, 0)) |
| | frame = np.nan_to_num(frame.astype(np.float32)) |
| | if frame.shape[-1] == 1: |
| | frame = np.repeat(frame, 3, axis=-1) |
| | if frame.max() > frame.min(): |
| | frame = (frame - frame.min()) / (frame.max() - frame.min()) |
| | return frame |
| | return None |
| |
|
| |
|
| | def _extract_predictions(decoded: Any) -> dict[str, torch.Tensor]: |
| | if isinstance(decoded, dict): |
| | return decoded |
| | predictions = getattr(decoded, "predictions", {}) |
| | if isinstance(predictions, dict): |
| | return predictions |
| | return {} |
| |
|
| |
|
| | @lru_cache(maxsize=8) |
| | def _load_model(model_type: str, checkpoint_path: str) -> Any: |
| | spec = MODEL_SPECS[model_type] |
| | model = create_world_model( |
| | spec["model_id"], |
| | obs_shape=spec["obs_shape"], |
| | action_dim=spec["action_dim"], |
| | ) |
| |
|
| | resolved_checkpoint = Path(checkpoint_path).expanduser() if checkpoint_path else None |
| | if resolved_checkpoint and resolved_checkpoint.exists(): |
| | model = model.__class__.from_pretrained(str(resolved_checkpoint)) |
| |
|
| | model.eval() |
| | return model |
| |
|
| |
|
| | def _build_initial_obs(model_type: str) -> torch.Tensor: |
| | spec = MODEL_SPECS[model_type] |
| | obs_shape = spec["obs_shape"] |
| | if len(obs_shape) == 3: |
| | channels, height, width = obs_shape |
| | grid = np.linspace(0.0, 1.0, height * width, dtype=np.float32).reshape(height, width) |
| | obs = np.stack([(grid + i / max(1, channels)) % 1.0 for i in range(channels)], axis=0) |
| | return torch.from_numpy(obs).unsqueeze(0) |
| | if len(obs_shape) == 1: |
| | vector = np.linspace(-1.0, 1.0, obs_shape[0], dtype=np.float32) |
| | return torch.from_numpy(vector).unsqueeze(0) |
| | raise ValueError(f"Unsupported observation shape: {obs_shape}") |
| |
|
| |
|
| | def _build_action_sequence(model_type: str, horizon: int, device: torch.device) -> torch.Tensor: |
| | action_dim = MODEL_SPECS[model_type]["action_dim"] |
| | actions = torch.zeros(horizon, 1, action_dim, device=device) |
| | for t in range(horizon): |
| | actions[t, 0, t % action_dim] = 1.0 |
| | return actions |
| |
|
| |
|
| | def _plot_rewards(rewards: list[float]): |
| | import matplotlib.pyplot as plt |
| |
|
| | fig, ax = plt.subplots(figsize=(8, 4)) |
| | ax.plot(rewards, marker="o") |
| | ax.set_xlabel("Time Step") |
| | ax.set_ylabel("Predicted Reward") |
| | ax.set_title("Imagined Rewards") |
| | ax.grid(True, alpha=0.3) |
| | return fig |
| |
|
| |
|
| | def _plot_continues(continues: list[float]): |
| | import matplotlib.pyplot as plt |
| |
|
| | fig, ax = plt.subplots(figsize=(8, 4)) |
| | ax.plot(continues, marker="s", color="green") |
| | ax.set_xlabel("Time Step") |
| | ax.set_ylabel("Continue Probability") |
| | ax.set_title("Episode Continuation") |
| | ax.grid(True, alpha=0.3) |
| | ax.set_ylim([0, 1]) |
| | return fig |
| |
|
| |
|
| | def _plot_frames(frames: list[np.ndarray]): |
| | import matplotlib.pyplot as plt |
| |
|
| | fig, ax = plt.subplots(figsize=(8, 4)) |
| | if not frames: |
| | ax.text( |
| | 0.5, |
| | 0.5, |
| | "This model does not decode image observations.\n(Reward/continue are real model outputs)", |
| | ha="center", |
| | va="center", |
| | fontsize=10, |
| | ) |
| | ax.axis("off") |
| | return fig |
| |
|
| | preview = np.concatenate(frames[: min(5, len(frames))], axis=1) |
| | ax.imshow(preview) |
| | ax.set_title("Imagined Frame Preview") |
| | ax.axis("off") |
| | return fig |
| |
|
| |
|
| | def run_imagination(model_type: str, horizon: int, checkpoint_path: str): |
| | model = _load_model(model_type, checkpoint_path.strip()) |
| | device = next(model.parameters()).device |
| | initial_obs = _build_initial_obs(model_type).to(device=device) |
| | action_sequence = _build_action_sequence(model_type, int(horizon), device) |
| |
|
| | rewards: list[float] = [] |
| | continues: list[float] = [] |
| | frames: list[np.ndarray] = [] |
| |
|
| | with torch.no_grad(): |
| | state = model.encode({"obs": initial_obs}) |
| | trajectory = model.rollout(state, action_sequence) |
| |
|
| | rollout_rewards = getattr(trajectory, "rewards", None) |
| | if isinstance(rollout_rewards, torch.Tensor): |
| | rewards = rollout_rewards.detach().cpu().view(-1).tolist() |
| | rollout_continues = getattr(trajectory, "continues", None) |
| | if isinstance(rollout_continues, torch.Tensor): |
| | continues = torch.sigmoid(rollout_continues).detach().cpu().view(-1).tolist() |
| |
|
| | for state_t in trajectory.states[1:]: |
| | decoded = model.decode(state_t) |
| | predictions = _extract_predictions(decoded) |
| |
|
| | if not rewards and isinstance(predictions.get("reward"), torch.Tensor): |
| | rewards.append(float(predictions["reward"].detach().cpu().view(-1)[0])) |
| | if not continues and isinstance(predictions.get("continue"), torch.Tensor): |
| | continues.append( |
| | float(torch.sigmoid(predictions["continue"]).detach().cpu().view(-1)[0]) |
| | ) |
| |
|
| | obs_pred = predictions.get("obs") |
| | if isinstance(obs_pred, torch.Tensor): |
| | frame = _to_numpy_frame(obs_pred) |
| | if frame is not None: |
| | frames.append(frame) |
| |
|
| | if not rewards: |
| | rewards = [0.0 for _ in range(int(horizon))] |
| | if not continues: |
| | continues = [1.0 for _ in range(int(horizon))] |
| |
|
| | rewards_plot = _plot_rewards(rewards) |
| | continues_plot = _plot_continues(continues) |
| | frames_plot = _plot_frames(frames) |
| | status = ( |
| | f"Ran {model_type} inference for {int(horizon)} steps " |
| | f"(checkpoint={checkpoint_path.strip() or 'model preset'})" |
| | ) |
| | return rewards_plot, continues_plot, frames_plot, status |
| |
|
| |
|
| | with gr.Blocks() as demo: |
| | gr.Markdown("# WorldFlux Demo") |
| | gr.Markdown("Actual WorldFlux encode → rollout → decode inference (no random mock outputs)") |
| |
|
| | model_type = gr.Dropdown( |
| | choices=["DreamerV3", "TD-MPC2"], value="DreamerV3", label="Model Type" |
| | ) |
| | checkpoint_path = gr.Textbox( |
| | label="Checkpoint Path (optional)", |
| | placeholder="/data/checkpoints/dreamer_final", |
| | ) |
| | horizon = gr.Slider(5, 50, value=15, step=1, label="Imagination Horizon") |
| | btn = gr.Button("Run Imagination") |
| |
|
| | with gr.Row(): |
| | rewards_plot = gr.Plot(label="Rewards") |
| | continues_plot = gr.Plot(label="Continues") |
| | frames_plot = gr.Plot(label="Imagined Frames") |
| | output_text = gr.Textbox(label="Status") |
| |
|
| | btn.click( |
| | run_imagination, |
| | inputs=[model_type, horizon, checkpoint_path], |
| | outputs=[rewards_plot, continues_plot, frames_plot, output_text], |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | demo.launch() |
| |
|