#!/usr/bin/env python3 """Export paper figures, tables, raw data, and provenance files. All values are read from experiments/reports/*.json and GIF frames are extracted from experiments/reports/paper_planning/gifs/*.gif. """ from __future__ import annotations import csv import json import math import re import shutil import subprocess from collections import defaultdict from dataclasses import dataclass from functools import lru_cache from pathlib import Path from typing import Any, Iterable from PIL import Image, ImageDraw, ImageFont, ImageSequence ROOT = Path(__file__).resolve().parents[1] REPORTS = ROOT / "experiments" / "reports" PLANNING_DIR = REPORTS / "paper_planning" GIF_DIR = PLANNING_DIR / "gifs" OUT = REPORTS / "paper_artifacts" PREDICTION_JSON = REPORTS / "paper_prediction.json" PROBE_JSON = REPORTS / "paper_flowmo_latent_probes.json" TASK_ORDER = ["reach_target", "station_keeping", "waypoint_square", "waypoint_zigzag"] BOAT_ORDER = ["twin", "triangle"] FLOW_ORDER = [ "noflow", "uniform", "vortex_center", "double_gyre", "source_sink", "source_sink_pair", "gradient", "shear", "turbulent_patch", "random_fourier", ] METHOD_ORDER = [ "flowmo", "leworldmodel", "planet", "tdmpc2", "pid_los_controller", "no_flow_los_controller", "current_estimator_los_controller", "oracle_flow_los_controller", ] LEARNED_METHODS = ["flowmo", "leworldmodel", "planet", "tdmpc2"] TRADITIONAL_METHODS = [ "pid_los_controller", "no_flow_los_controller", "current_estimator_los_controller", "oracle_flow_los_controller", ] METHOD_LABEL = { "flowmo": "FlowMo-WM", "leworldmodel": "LeWorldModel", "planet": "PlaNet/RSSM", "tdmpc2": "TD-MPC2", "pid_los_controller": "PID/LOS", "no_flow_los_controller": "No-Flow LOS", "current_estimator_los_controller": "Current-Estimator LOS", "oracle_flow_los_controller": "Oracle-Flow LOS", } METHOD_DESCRIPTION = { "pid_los_controller": "Line-of-sight waypoint tracking baseline using the clean-image pose estimate.", "no_flow_los_controller": "Line-of-sight tracking that ignores ambient flow; measures the cost of no current compensation.", "current_estimator_los_controller": "Line-of-sight tracking with an online drift estimate from recent pose history.", "oracle_flow_los_controller": "Line-of-sight tracking with privileged true local simulator flow feed-forward; a reference controller, not a world-model baseline.", } METHOD_SHORT = { "flowmo": "FlowMo", "leworldmodel": "LeWM", "planet": "RSSM", "tdmpc2": "TD2", "pid_los_controller": "PID/LOS", "no_flow_los_controller": "NF-LOS", "current_estimator_los_controller": "CE-LOS", "oracle_flow_los_controller": "OF-LOS", } TASK_LABEL = { "reach_target": "Reach", "station_keeping": "Station", "waypoint_square": "Square", "waypoint_zigzag": "Zigzag", } BOAT_LABEL = {"twin": "Twin", "triangle": "Triangle"} FLOW_LABEL = { "noflow": "No flow", "uniform": "Uniform", "vortex_center": "Vortex", "double_gyre": "Double gyre", "source_sink": "Source/sink", "source_sink_pair": "Src/sink pair", "gradient": "Gradient", "shear": "Shear", "turbulent_patch": "Turbulent", "random_fourier": "Fourier", } HORIZONS = [1, 3, 6, 8, 10, 20, 30, 40, 60] FIG3_TASK_FLOW = { "reach_target": "uniform", "station_keeping": "vortex_center", "waypoint_square": "gradient", "waypoint_zigzag": "random_fourier", } FIG3_EPISODE = 0 METHOD_COLORS = { "flowmo": (31, 119, 180), "leworldmodel": (255, 127, 14), "planet": (44, 160, 44), "tdmpc2": (148, 103, 189), "pid_los_controller": (127, 127, 127), "no_flow_los_controller": (214, 39, 40), "current_estimator_los_controller": (23, 190, 207), "oracle_flow_los_controller": (140, 86, 75), } @dataclass(frozen=True) class SummaryRecord: source_file: str item_index: int method: str task: str boat: str flow_type: str context_mode: str episodes: int successes: int success_rate: float final_distance_mean: float mean_min_goal_distance: float path_length_success_mean: float | None energy_success_mean: float | None steps_success_mean: float | None @dataclass(frozen=True) class EpisodeRecord: source_file: str item_index: int result_index: int method: str task: str boat: str flow_type: str context_mode: str episode: int success: bool final_distance: float mean_min_goal_distance: float energy: float | None path_length: float | None steps: int | None def rel(path: Path) -> str: return str(path.relative_to(ROOT)) def ensure_dir(path: Path) -> None: path.mkdir(parents=True, exist_ok=True) def trim_whitespace(img: Image.Image, pad_x: int = 8, pad_y: int = 0, threshold: int = 250) -> Image.Image: """Crop near-white border while preserving a small horizontal margin.""" rgb = img.convert("RGB") pix = rgb.load() w, h = rgb.size min_x, min_y = w, h max_x, max_y = -1, -1 for y in range(h): for x in range(w): r, g, b = pix[x, y] if r < threshold or g < threshold or b < threshold: min_x = min(min_x, x) min_y = min(min_y, y) max_x = max(max_x, x) max_y = max(max_y, y) if max_x < min_x or max_y < min_y: return rgb min_x = max(0, min_x - pad_x) max_x = min(w - 1, max_x + pad_x) min_y = max(0, min_y - pad_y) max_y = min(h - 1, max_y + pad_y) return rgb.crop((min_x, min_y, max_x + 1, max_y + 1)) def read_json(path: Path) -> Any: with path.open("r", encoding="utf-8") as f: return json.load(f) def write_text(path: Path, text: str) -> None: ensure_dir(path.parent) path.write_text(text, encoding="utf-8") def safe_float(value: Any) -> float | None: if value is None: return None if isinstance(value, float) and math.isnan(value): return None return float(value) def fmt(value: float | int | None, digits: int = 3) -> str: if value is None: return "--" if isinstance(value, float) and math.isnan(value): return "--" return f"{float(value):.{digits}f}" def pct(value: float | None, digits: int = 1) -> str: if value is None: return "--" return f"{100.0 * float(value):.{digits}f}" def latex_escape(text: str) -> str: repl = { "\\": r"\textbackslash{}", "&": r"\&", "%": r"\%", "$": r"\$", "#": r"\#", "_": r"\_", "{": r"\{", "}": r"\}", "~": r"\textasciitilde{}", "^": r"\textasciicircum{}", } return "".join(repl.get(ch, ch) for ch in text) def write_table(path: Path, header: list[str], rows: list[list[str]], caption: str, label: str) -> None: colspec = "l" * len(header) lines = [ r"\begin{table*}[t]", r"\centering", rf"\caption{{{caption}}}", rf"\label{{{label}}}", r"\scriptsize", rf"\begin{{tabular}}{{{colspec}}}", r"\toprule", " & ".join(latex_escape(h) for h in header) + r" \\", r"\midrule", ] for row in rows: lines.append(" & ".join(latex_escape(x) for x in row) + r" \\") lines += [r"\bottomrule", r"\end{tabular}", r"\end{table*}", ""] write_text(path, "\n".join(lines)) def write_rows(path_base: Path, rows: list[dict[str, Any]], fieldnames: list[str]) -> None: ensure_dir(path_base.parent) for ext, dialect in [(".tsv", "excel-tab"), (".csv", "excel")]: with (path_base.with_suffix(ext)).open("w", encoding="utf-8", newline="") as f: writer = csv.DictWriter(f, fieldnames=fieldnames, dialect=dialect) writer.writeheader() for row in rows: writer.writerow({k: row.get(k, "") for k in fieldnames}) @lru_cache(maxsize=2) def arial_font_path(bold: bool) -> str | None: family = "Arial:style=Bold" if bold else "Arial:style=Regular" try: path = subprocess.check_output(["fc-match", "-f", "%{file}", family], text=True).strip() except (OSError, subprocess.CalledProcessError): return None return path or None def font(size: int, bold: bool = False) -> ImageFont.FreeTypeFont | ImageFont.ImageFont: matched = arial_font_path(bold) candidates = [ matched, "/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf" if bold else "/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf", "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf" if bold else "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", "/usr/share/fonts/dejavu/DejaVuSans-Bold.ttf" if bold else "/usr/share/fonts/dejavu/DejaVuSans.ttf", ] for path in candidates: if not path: continue try: return ImageFont.truetype(path, size=size) except OSError: pass return ImageFont.load_default() def load_planning() -> tuple[list[SummaryRecord], list[EpisodeRecord]]: summaries: list[SummaryRecord] = [] episodes: list[EpisodeRecord] = [] for path in sorted(PLANNING_DIR.glob("*.json")): data = read_json(path) if not isinstance(data, list): raise ValueError(f"Expected list in {path}") for item_index, item in enumerate(data): method = item["method"] task = item["task"] boat = item["boat"] flow_type = item["flow_type"] for context_mode, metrics in item["by_context"].items(): summaries.append( SummaryRecord( source_file=rel(path), item_index=item_index, method=method, task=task, boat=boat, flow_type=flow_type, context_mode=context_mode, episodes=int(metrics["episodes"]), successes=int(metrics["successes"]), success_rate=float(metrics["success_rate"]), final_distance_mean=float(metrics["final_distance_mean"]), mean_min_goal_distance=float(metrics["mean_min_goal_distance"]), path_length_success_mean=safe_float(metrics.get("path_length_success_mean")), energy_success_mean=safe_float(metrics.get("energy_success_mean")), steps_success_mean=safe_float(metrics.get("steps_success_mean")), ) ) for result_index, result in enumerate(item["results"]): episodes.append( EpisodeRecord( source_file=rel(path), item_index=item_index, result_index=result_index, method=method, task=task, boat=boat, flow_type=flow_type, context_mode=result["context_mode"], episode=int(result["episode"]), success=bool(result["success"]), final_distance=float(result["final_distance"]), mean_min_goal_distance=float(result["mean_min_goal_distance"]), energy=safe_float(result.get("energy")), path_length=safe_float(result.get("path_length")), steps=int(result["steps"]) if result.get("steps") is not None else None, ) ) return summaries, episodes def inferred_summaries(records: Iterable[SummaryRecord]) -> list[SummaryRecord]: return [r for r in records if r.context_mode == "inferred" and r.method in METHOD_ORDER] def inferred_episodes(records: Iterable[EpisodeRecord]) -> list[EpisodeRecord]: return [r for r in records if r.context_mode == "inferred" and r.method in METHOD_ORDER] def aggregate_success(records: Iterable[SummaryRecord], group_keys: tuple[str, ...]) -> list[dict[str, Any]]: acc: dict[tuple[Any, ...], dict[str, Any]] = {} sources: dict[tuple[Any, ...], set[str]] = defaultdict(set) for r in records: key = tuple(getattr(r, k) for k in group_keys) entry = acc.setdefault(key, {k: getattr(r, k) for k in group_keys} | {"successes": 0, "episodes": 0}) entry["successes"] += r.successes entry["episodes"] += r.episodes sources[key].add(r.source_file) rows = [] for key, entry in acc.items(): episodes = entry["episodes"] success_rate = entry["successes"] / episodes if episodes else math.nan rows.append(entry | {"success_rate": success_rate, "source_files": ";".join(sorted(sources[key]))}) return rows def task_sort_key(task: str) -> int: return TASK_ORDER.index(task) if task in TASK_ORDER else len(TASK_ORDER) def boat_sort_key(boat: str) -> int: return BOAT_ORDER.index(boat) if boat in BOAT_ORDER else len(BOAT_ORDER) def flow_sort_key(flow: str) -> int: return FLOW_ORDER.index(flow) if flow in FLOW_ORDER else len(FLOW_ORDER) def method_sort_key(method: str) -> int: return METHOD_ORDER.index(method) if method in METHOD_ORDER else len(METHOD_ORDER) def extract_fig3() -> None: fig_dir = OUT / "fig3" frames_dir = fig_dir / "frames" ensure_dir(frames_dir) rows: list[dict[str, Any]] = [] cell_images: dict[tuple[str, str, str], Path] = {} frame_names = ["first", "middle", "last"] for task in TASK_ORDER: flow = FIG3_TASK_FLOW[task] for boat in BOAT_ORDER: gif_path = GIF_DIR / f"image_planning_flowmo_inferred_{boat}_{task}_{flow}_ep{FIG3_EPISODE:03d}.gif" if not gif_path.exists(): raise FileNotFoundError(f"Missing GIF for Fig. 3: {gif_path}") with Image.open(gif_path) as im: n_frames = getattr(im, "n_frames", 1) frame_indices = [0, n_frames // 2, n_frames - 1] for frame_name, frame_index in zip(frame_names, frame_indices): im.seek(frame_index) frame = im.convert("RGBA") out_name = f"fig3_flowmo_inferred_{boat}_{task}_{flow}_ep{FIG3_EPISODE:03d}_{frame_name}_frame{frame_index:03d}.png" out_path = frames_dir / out_name frame.save(out_path) cell_images[(task, boat, frame_name)] = out_path rows.append( { "task": task, "boat": boat, "method": "flowmo", "context_mode": "inferred", "flow_type": flow, "episode": FIG3_EPISODE, "frame_name": frame_name, "frame_index": frame_index, "gif_frames": n_frames, "source_gif": rel(gif_path), "output_png": rel(out_path), } ) write_rows( fig_dir / "figure3_frame_manifest", rows, [ "task", "boat", "method", "context_mode", "flow_type", "episode", "frame_name", "frame_index", "gif_frames", "source_gif", "output_png", ], ) thumb_w, thumb_h = 210, 210 left = 138 top = 58 row_gap = 16 group_gap = 30 width = left + 6 * thumb_w + group_gap + 14 height = top + len(TASK_ORDER) * thumb_h + (len(TASK_ORDER) - 1) * row_gap + 14 canvas = Image.new("RGB", (width, height), "white") draw = ImageDraw.Draw(canvas) head_font = font(30, True) flow_font = font(25, False) group_font = font(30, True) twin_x = left tri_x = left + 3 * thumb_w + group_gap draw.text((twin_x + 1.5 * thumb_w - draw.textlength("Twin", font=group_font) / 2, 12), "Twin", fill=(20, 25, 30), font=group_font) draw.text((tri_x + 1.5 * thumb_w - draw.textlength("Triangle", font=group_font) / 2, 12), "Triangle", fill=(20, 25, 30), font=group_font) for row, task in enumerate(TASK_ORDER): y = top + row * (thumb_h + row_gap) draw.text((12, y + 72), TASK_LABEL[task], fill=(20, 25, 30), font=head_font) draw.text((12, y + 104), FLOW_LABEL[FIG3_TASK_FLOW[task]], fill=(80, 80, 80), font=flow_font) for boat in BOAT_ORDER: base_x = twin_x if boat == "twin" else tri_x for frame_idx, frame_name in enumerate(frame_names): x = base_x + frame_idx * thumb_w src = cell_images[(task, boat, frame_name)] img = Image.open(src).convert("RGB").resize((thumb_w, thumb_h), Image.Resampling.LANCZOS) canvas.paste(img, (x, y)) draw.rectangle([x, y, x + thumb_w, y + thumb_h], outline=(220, 220, 220), width=1) contact = fig_dir / "figure3_rollout_contact_sheet.png" trim_whitespace(canvas, pad_x=8, pad_y=0).save(contact) md_lines = [ "# Figure 3 Provenance", "", "Purpose: qualitative task rollouts extracted from experiment GIFs.", "", f"Selected method/context: `flowmo` / `inferred`.", f"Selected episode: `{FIG3_EPISODE}`.", "Layout: for each task and boat, the three adjacent frames are first, middle, and last; spacing appears only between task/boat groups.", "", "Selected flows by task:", ] for task in TASK_ORDER: md_lines.append(f"- `{task}`: `{FIG3_TASK_FLOW[task]}`") md_lines += [ "", "Frame rule: for each source GIF, extracted `first = 0`, `middle = n_frames // 2`, and `last = n_frames - 1`.", "", "Generated outputs:", f"- `{rel(contact)}`", f"- `{rel(frames_dir)}/`", f"- `{rel(fig_dir / 'figure3_frame_manifest.tsv')}`", f"- `{rel(fig_dir / 'figure3_frame_manifest.csv')}`", "", "Source GIFs:", ] for source in sorted({row["source_gif"] for row in rows}): md_lines.append(f"- `{source}`") write_text(fig_dir / "figure3_provenance.md", "\n".join(md_lines) + "\n") def load_prediction_rows() -> list[dict[str, Any]]: data = read_json(PREDICTION_JSON) rows: list[dict[str, Any]] = [] for item_index, item in enumerate(data): method = item["method"] if method not in LEARNED_METHODS: continue metrics = item["inferred"] for horizon in HORIZONS: rows.append( { "method": method, "method_label": METHOD_LABEL[method], "context_mode": "inferred", "horizon": horizon, "position_error": metrics[f"pos{horizon}"], "heading_error": metrics[f"heading{horizon}"], "source_file": rel(PREDICTION_JSON), "json_path_position": f"$[{item_index}].inferred.pos{horizon}", "json_path_heading": f"$[{item_index}].inferred.heading{horizon}", } ) return rows def draw_line_panel( draw: ImageDraw.ImageDraw, box: tuple[int, int, int, int], rows: list[dict[str, Any]], title: str, colors: dict[str, tuple[int, int, int]], compact: bool = False, show_legend: bool = True, title_y: int | None = None, ) -> None: x0, y0, x1, y1 = box axis_font = font(20 if compact else 15) title_font = font(26 if compact else 22, True) title_offset = 48 if compact else 42 draw.text((x0, title_y if title_y is not None else y0 - title_offset), title, fill=(20, 25, 30), font=title_font) draw.rectangle([x0, y0, x1, y1], outline=(210, 210, 210), width=1) if compact: pad_l, pad_b, pad_t, pad_r = 74, 68, 28, 24 else: pad_l, pad_b, pad_t, pad_r = 64, 52, 24, 20 px0, py0, px1, py1 = x0 + pad_l, y0 + pad_t, x1 - pad_r, y1 - pad_b max_y = max(float(r["position_error"]) for r in rows) * 1.12 min_h, max_h = min(HORIZONS), max(HORIZONS) for tick in [0.0, 0.25, 0.50, 0.75, 1.0]: y = py1 - tick * (py1 - py0) val = tick * max_y draw.line([px0, y, px1, y], fill=(232, 232, 232), width=1) draw.text((x0 + 8, y - 11), f"{val:.2f}", fill=(70, 70, 70), font=axis_font) draw.line([px0, py1, px1, py1], fill=(60, 60, 60), width=2) draw.line([px0, py0, px0, py1], fill=(60, 60, 60), width=2) for h in HORIZONS: x = px0 + (h - min_h) / (max_h - min_h) * (px1 - px0) draw.line([x, py1, x, py1 + 5], fill=(60, 60, 60), width=1) if (compact or (x1 - x0) < 700) and h in [3, 6, 8, 30]: continue draw.text((x - 12, py1 + 12), str(h), fill=(70, 70, 70), font=axis_font) draw.text(((px0 + px1) // 2 - (72 if compact else 60), y1 - (36 if compact else 28)), "rollout step", fill=(60, 60, 60), font=axis_font) by_method: dict[str, list[dict[str, Any]]] = defaultdict(list) for row in rows: by_method[row["method"]].append(row) for method in LEARNED_METHODS: pts = [] for row in sorted(by_method[method], key=lambda x: int(x["horizon"])): h = int(row["horizon"]) x = px0 + (h - min_h) / (max_h - min_h) * (px1 - px0) y = py1 - float(row["position_error"]) / max_y * (py1 - py0) pts.append((x, y)) if len(pts) >= 2: draw.line(pts, fill=colors[method], width=4) for x, y in pts: rr = 5 if compact else 4 draw.ellipse([x - rr, y - rr, x + rr, y + rr], fill=colors[method]) if show_legend: lx, ly = (px0 + 20, py0 + 14) if compact else (px1 - 185, py0 + 10) for i, method in enumerate(LEARNED_METHODS): if compact: col = i % 2 row = i // 2 xx = lx + col * 250 yy = ly + row * 30 else: xx = lx yy = ly + i * 24 draw.line([xx, yy + 10, xx + 32, yy + 10], fill=colors[method], width=5 if compact else 4) draw.text((xx + 42, yy - 2), METHOD_SHORT[method], fill=(40, 40, 40), font=axis_font) def draw_success_bar_panel( draw: ImageDraw.ImageDraw, box: tuple[int, int, int, int], rows: list[dict[str, Any]], title: str, ) -> None: x0, y0, x1, y1 = box axis_font = font(15) small_font = font(13) title_font = font(22, True) draw.text((x0, y0 - 42), title, fill=(20, 25, 30), font=title_font) draw.rectangle([x0, y0, x1, y1], outline=(210, 210, 210), width=1) pad_l, pad_b, pad_t, pad_r = 58, 70, 24, 18 px0, py0, px1, py1 = x0 + pad_l, y0 + pad_t, x1 - pad_r, y1 - pad_b for tick in [0, 0.25, 0.50, 0.75, 1.0]: y = py1 - tick * (py1 - py0) draw.line([px0, y, px1, y], fill=(232, 232, 232), width=1) draw.text((x0 + 10, y - 8), f"{int(tick * 100)}", fill=(70, 70, 70), font=axis_font) draw.line([px0, py1, px1, py1], fill=(60, 60, 60), width=2) draw.line([px0, py0, px0, py1], fill=(60, 60, 60), width=2) rates = {(r["method"], r["boat"]): float(r["success_rate"]) for r in rows} group_w = (px1 - px0) / len(METHOD_ORDER) bar_w = group_w * 0.30 boat_colors = {"twin": (54, 119, 191), "triangle": (218, 119, 54)} for idx, method in enumerate(METHOD_ORDER): cx = px0 + idx * group_w + group_w * 0.5 for j, boat in enumerate(BOAT_ORDER): rate = rates.get((method, boat), 0.0) x_left = cx + (j - 0.5) * bar_w - bar_w * 0.5 x_right = x_left + bar_w y_top = py1 - rate * (py1 - py0) draw.rectangle([x_left, y_top, x_right, py1], fill=boat_colors[boat], outline=(255, 255, 255)) draw.text((cx - 24, py1 + 10), METHOD_SHORT[method], fill=(55, 55, 55), font=small_font) if method == "tdmpc2": split_x = px0 + (idx + 1) * group_w draw.line([split_x, py0, split_x, py1 + 30], fill=(80, 80, 80), width=2) lx, ly = px1 - 180, py0 + 12 for i, boat in enumerate(BOAT_ORDER): yy = ly + i * 24 draw.rectangle([lx, yy, lx + 18, yy + 14], fill=boat_colors[boat]) draw.text((lx + 26, yy - 2), BOAT_LABEL[boat], fill=(40, 40, 40), font=axis_font) draw.text((x0 + 8, y0 + 8), "success rate (%)", fill=(70, 70, 70), font=axis_font) def draw_success_by_task_panel( draw: ImageDraw.ImageDraw, box: tuple[int, int, int, int], rows: list[dict[str, Any]], boat: str, title: str, show_legend: bool = False, ) -> None: x0, y0, x1, y1 = box axis_font = font(15) small_font = font(12) title_font = font(21, True) draw.text((x0, y0 - 36), title, fill=(20, 25, 30), font=title_font) draw.rectangle([x0, y0, x1, y1], outline=(210, 210, 210), width=1) pad_l, pad_b, pad_t, pad_r = 58, 58, (76 if show_legend else 26), 16 px0, py0, px1, py1 = x0 + pad_l, y0 + pad_t, x1 - pad_r, y1 - pad_b for tick in [0, 0.25, 0.50, 0.75, 1.0]: y = py1 - tick * (py1 - py0) draw.line([px0, y, px1, y], fill=(232, 232, 232), width=1) draw.text((x0 + 10, y - 8), f"{int(tick * 100)}", fill=(70, 70, 70), font=axis_font) draw.line([px0, py1, px1, py1], fill=(60, 60, 60), width=2) draw.line([px0, py0, px0, py1], fill=(60, 60, 60), width=2) draw.text((x0 + 8, y0 + 8), "success (%)", fill=(70, 70, 70), font=axis_font) rates = {(r["task"], r["method"]): float(r["success_rate"]) for r in rows if r["boat"] == boat} group_w = (px1 - px0) / len(TASK_ORDER) bar_w = group_w * 0.70 / len(METHOD_ORDER) for task_idx, task in enumerate(TASK_ORDER): group_left = px0 + task_idx * group_w + group_w * 0.15 for method_idx, method in enumerate(METHOD_ORDER): rate = rates.get((task, method), 0.0) x_left = group_left + method_idx * bar_w x_right = x_left + bar_w * 0.88 y_top = py1 - rate * (py1 - py0) draw.rectangle([x_left, y_top, x_right, py1], fill=METHOD_COLORS[method], outline=(255, 255, 255)) label = TASK_LABEL[task] tw = draw.textlength(label, font=small_font) draw.text((px0 + task_idx * group_w + (group_w - tw) / 2, py1 + 12), label, fill=(45, 45, 45), font=small_font) if task_idx > 0: split_x = px0 + task_idx * group_w draw.line([split_x, py0, split_x, py1 + 24], fill=(218, 218, 218), width=1) if show_legend: lx, ly = x0 + 78, y0 + 36 for i, method in enumerate(METHOD_ORDER): row = i // 4 col = i % 4 xx = lx + col * 145 yy = ly + row * 22 draw.rectangle([xx, yy, xx + 16, yy + 12], fill=METHOD_COLORS[method]) draw.text((xx + 22, yy - 3), METHOD_SHORT[method], fill=(40, 40, 40), font=small_font) def draw_learned_success_by_task_boat_panel( draw: ImageDraw.ImageDraw, box: tuple[int, int, int, int], rows: list[dict[str, Any]], title: str, ) -> None: x0, y0, x1, y1 = box axis_font = font(15) small_font = font(12) title_font = font(22, True) draw.text((x0, y0 - 42), title, fill=(20, 25, 30), font=title_font) draw.rectangle([x0, y0, x1, y1], outline=(210, 210, 210), width=1) pad_l, pad_b, pad_t, pad_r = 58, 86, 66, 18 px0, py0, px1, py1 = x0 + pad_l, y0 + pad_t, x1 - pad_r, y1 - pad_b for tick in [0, 0.25, 0.50, 0.75, 1.0]: y = py1 - tick * (py1 - py0) draw.line([px0, y, px1, y], fill=(232, 232, 232), width=1) draw.text((x0 + 10, y - 8), f"{int(tick * 100)}", fill=(70, 70, 70), font=axis_font) draw.line([px0, py1, px1, py1], fill=(60, 60, 60), width=2) draw.line([px0, py0, px0, py1], fill=(60, 60, 60), width=2) draw.text((x0 + 8, y0 + 8), "success (%)", fill=(70, 70, 70), font=axis_font) lx, ly = x0 + 82, y0 + 36 for i, method in enumerate(LEARNED_METHODS): xx = lx + i * 190 draw.rectangle([xx, ly, xx + 18, ly + 12], fill=METHOD_COLORS[method]) draw.text((xx + 24, ly - 4), METHOD_SHORT[method], fill=(40, 40, 40), font=small_font) rates = {(r["task"], r["boat"], r["method"]): float(r["success_rate"]) for r in rows} task_w = (px1 - px0) / len(TASK_ORDER) boat_w = task_w / len(BOAT_ORDER) bar_w = boat_w * 0.72 / len(LEARNED_METHODS) for task_idx, task in enumerate(TASK_ORDER): task_left = px0 + task_idx * task_w if task_idx > 0: draw.line([task_left, py0, task_left, py1 + 44], fill=(218, 218, 218), width=1) task_label = TASK_LABEL[task] tw = draw.textlength(task_label, font=axis_font) draw.text((task_left + (task_w - tw) / 2, py1 + 42), task_label, fill=(35, 35, 35), font=axis_font) for boat_idx, boat in enumerate(BOAT_ORDER): boat_left = task_left + boat_idx * boat_w group_left = boat_left + boat_w * 0.14 for method_idx, method in enumerate(LEARNED_METHODS): rate = rates.get((task, boat, method), 0.0) x_left = group_left + method_idx * bar_w x_right = x_left + bar_w * 0.86 y_top = py1 - rate * (py1 - py0) draw.rectangle([x_left, y_top, x_right, py1], fill=METHOD_COLORS[method], outline=(255, 255, 255)) boat_label = BOAT_LABEL[boat] bw = draw.textlength(boat_label, font=small_font) draw.text((boat_left + (boat_w - bw) / 2, py1 + 14), boat_label, fill=(55, 55, 55), font=small_font) def draw_single_column_success_panel( draw: ImageDraw.ImageDraw, box: tuple[int, int, int, int], rows: list[dict[str, Any]], boat: str, title: str, show_legend: bool = False, ) -> None: x0, y0, x1, y1 = box axis_font = font(20) small_font = font(18) title_font = font(25, True) draw.text((x0, y0 - 44), title, fill=(20, 25, 30), font=title_font) draw.rectangle([x0, y0, x1, y1], outline=(210, 210, 210), width=1) pad_l, pad_b, pad_t, pad_r = 76, 76, 70 if show_legend else 64, 24 px0, py0, px1, py1 = x0 + pad_l, y0 + pad_t, x1 - pad_r, y1 - pad_b for tick in [0, 0.25, 0.50, 0.75, 1.0]: y = py1 - tick * (py1 - py0) draw.line([px0, y, px1, y], fill=(232, 232, 232), width=1) draw.text((x0 + 12, y - 11), f"{int(tick * 100)}", fill=(70, 70, 70), font=axis_font) draw.line([px0, py1, px1, py1], fill=(60, 60, 60), width=2) draw.line([px0, py0, px0, py1], fill=(60, 60, 60), width=2) draw.text((x0 + 10, y0 + 10), "success (%)", fill=(70, 70, 70), font=axis_font) if show_legend: lx, ly = x0 + 100, y0 + 38 for i, method in enumerate(LEARNED_METHODS): xx = lx + i * 170 draw.rectangle([xx, ly, xx + 22, ly + 16], fill=METHOD_COLORS[method]) draw.text((xx + 30, ly - 5), METHOD_SHORT[method], fill=(40, 40, 40), font=small_font) rates = {(r["task"], r["method"]): float(r["success_rate"]) for r in rows if r["boat"] == boat} group_w = (px1 - px0) / len(TASK_ORDER) bar_w = group_w * 0.72 / len(LEARNED_METHODS) for task_idx, task in enumerate(TASK_ORDER): group_left = px0 + task_idx * group_w + group_w * 0.14 if task_idx > 0: split_x = px0 + task_idx * group_w draw.line([split_x, py0, split_x, py1 + 38], fill=(218, 218, 218), width=1) for method_idx, method in enumerate(LEARNED_METHODS): rate = rates.get((task, method), 0.0) x_left = group_left + method_idx * bar_w x_right = x_left + bar_w * 0.86 y_top = py1 - rate * (py1 - py0) draw.rectangle([x_left, y_top, x_right, py1], fill=METHOD_COLORS[method], outline=(255, 255, 255)) label = TASK_LABEL[task] tw = draw.textlength(label, font=small_font) draw.text((px0 + task_idx * group_w + (group_w - tw) / 2, py1 + 18), label, fill=(45, 45, 45), font=small_font) def draw_compact_success_panel( draw: ImageDraw.ImageDraw, box: tuple[int, int, int, int], rows: list[dict[str, Any]], boat: str, title: str, title_y: int | None = None, show_legend: bool = False, ) -> None: x0, y0, x1, y1 = box axis_font = font(14) small_font = font(12) title_font = font(20, True) draw.text((x0, title_y if title_y is not None else y0 - 32), title, fill=(20, 25, 30), font=title_font) draw.rectangle([x0, y0, x1, y1], outline=(210, 210, 210), width=1) pad_l, pad_b, pad_t, pad_r = 44, 48, 52, 12 px0, py0, px1, py1 = x0 + pad_l, y0 + pad_t, x1 - pad_r, y1 - pad_b y_min, y_max = 0.50, 1.00 for tick in [0.50, 0.60, 0.70, 0.80, 0.90, 1.00]: y = py1 - ((tick - y_min) / (y_max - y_min)) * (py1 - py0) draw.line([px0, y, px1, y], fill=(232, 232, 232), width=1) draw.text((x0 + 8, y - 8), f"{int(tick * 100)}", fill=(70, 70, 70), font=axis_font) draw.line([px0, py1, px1, py1], fill=(60, 60, 60), width=2) draw.line([px0, py0, px0, py1], fill=(60, 60, 60), width=2) draw.text((x0 + 8, y0 + 7), "success (%)", fill=(70, 70, 70), font=axis_font) rates = {(r["task"], r["method"]): float(r["success_rate"]) for r in rows if r["boat"] == boat} group_w = (px1 - px0) / len(TASK_ORDER) bar_w = group_w * 0.74 / len(LEARNED_METHODS) for task_idx, task in enumerate(TASK_ORDER): group_left = px0 + task_idx * group_w + group_w * 0.13 if task_idx > 0: split_x = px0 + task_idx * group_w draw.line([split_x, py0, split_x, py1 + 26], fill=(218, 218, 218), width=1) for method_idx, method in enumerate(LEARNED_METHODS): rate = rates.get((task, method), 0.0) x_left = group_left + method_idx * bar_w x_right = x_left + bar_w * 0.86 scaled = max(0.0, min(1.0, (rate - y_min) / (y_max - y_min))) y_top = py1 - scaled * (py1 - py0) draw.rectangle([x_left, y_top, x_right, py1], fill=METHOD_COLORS[method], outline=(255, 255, 255)) label = TASK_LABEL[task] tw = draw.textlength(label, font=small_font) draw.text((px0 + task_idx * group_w + (group_w - tw) / 2, py1 + 12), label, fill=(45, 45, 45), font=small_font) def make_fig4(summaries: list[SummaryRecord]) -> None: fig_dir = OUT / "fig4" ensure_dir(fig_dir) prediction_rows = load_prediction_rows() write_rows( fig_dir / "figure4_prediction_error", prediction_rows, [ "method", "method_label", "context_mode", "horizon", "position_error", "heading_error", "source_file", "json_path_position", "json_path_heading", ], ) learned_summaries = [r for r in inferred_summaries(summaries) if r.method in LEARNED_METHODS] success_rows = aggregate_success(learned_summaries, ("task", "boat", "method")) success_rows.sort(key=lambda r: (task_sort_key(r["task"]), boat_sort_key(r["boat"]), method_sort_key(r["method"]))) for row in success_rows: row["method_label"] = METHOD_LABEL[row["method"]] row["task_label"] = TASK_LABEL[row["task"]] row["boat_label"] = BOAT_LABEL[row["boat"]] write_rows( fig_dir / "figure4_success_by_task_boat", success_rows, ["task", "task_label", "boat", "boat_label", "method", "method_label", "successes", "episodes", "success_rate", "source_files"], ) source_rows = [] for r in sorted(learned_summaries, key=lambda x: (task_sort_key(x.task), boat_sort_key(x.boat), method_sort_key(x.method), flow_sort_key(x.flow_type))): source_rows.append( { "task": r.task, "boat": r.boat, "method": r.method, "flow_type": r.flow_type, "successes": r.successes, "episodes": r.episodes, "success_rate": r.success_rate, "source_file": r.source_file, "item_index": r.item_index, "json_path_successes": f"$[{r.item_index}].by_context.inferred.successes", "json_path_episodes": f"$[{r.item_index}].by_context.inferred.episodes", } ) write_rows( fig_dir / "figure4_success_by_task_boat_source_rows", source_rows, [ "task", "boat", "method", "flow_type", "successes", "episodes", "success_rate", "source_file", "item_index", "json_path_successes", "json_path_episodes", ], ) canvas = Image.new("RGB", (1800, 505), "white") draw = ImageDraw.Draw(canvas) panel_title_y = 36 draw_line_panel(draw, (34, 68, 620, 410), prediction_rows, "(A) Prediction error", METHOD_COLORS, show_legend=False, title_y=panel_title_y) draw_compact_success_panel(draw, (656, 68, 1230, 410), success_rows, "twin", "(B) Twin planning success", show_legend=False, title_y=panel_title_y) draw_compact_success_panel(draw, (1268, 68, 1766, 410), success_rows, "triangle", "(C) Triangle planning success", show_legend=False, title_y=panel_title_y) legend_font = font(18) legend_y = 462 legend_x = 485 for i, method in enumerate(LEARNED_METHODS): xx = legend_x + i * 215 draw.line([xx, legend_y + 8, xx + 34, legend_y + 8], fill=METHOD_COLORS[method], width=5) draw.text((xx + 44, legend_y - 4), METHOD_LABEL[method], fill=(40, 40, 40), font=legend_font) out = fig_dir / "figure4_prediction_and_planning.png" trim_whitespace(canvas, pad_x=8, pad_y=0).save(out) md = [ "# Paper Figure 4 Provenance", "", "Purpose: quantitative paper Figure 4 with learned-world-model prediction error curves and planning success grouped by experiment/task.", "", "Generated outputs:", f"- `{rel(out)}`", f"- `{rel(fig_dir / 'figure4_prediction_error.tsv')}`", f"- `{rel(fig_dir / 'figure4_prediction_error.csv')}`", f"- `{rel(fig_dir / 'figure4_success_by_task_boat.tsv')}`", f"- `{rel(fig_dir / 'figure4_success_by_task_boat.csv')}`", f"- `{rel(fig_dir / 'figure4_success_by_task_boat_source_rows.tsv')}`", f"- `{rel(fig_dir / 'figure4_success_by_task_boat_source_rows.csv')}`", "", "Panel (A) source:", f"- `{rel(PREDICTION_JSON)}`", "- JSON selectors: `$[method_index].inferred.pos{horizon}` and `$[method_index].inferred.heading{horizon}` for horizons 1, 3, 6, 8, 10, 20, 30, 40, 60.", "- Included methods: `flowmo`, `leworldmodel`, `planet`, `tdmpc2`.", "", "Panels (B) and (C) source:", f"- `{rel(PLANNING_DIR)}/*.json`", "- JSON selectors: `$[item_index].by_context.inferred.successes` and `$[item_index].by_context.inferred.episodes`.", "- Included methods: `flowmo`, `leworldmodel`, `planet`, `tdmpc2`.", "- Aggregation: sum successes and episodes over all flow types for each task, learned method, and boat.", "- Row-level source entries are recorded in `figure4_success_by_task_boat_source_rows.tsv/csv` with `source_file`, `item_index`, and JSON path columns.", "- Excluded diagnostic FlowMo contexts: `zero`, `shuffled`.", ] write_text(fig_dir / "figure4_provenance.md", "\n".join(md) + "\n") def draw_failure_line_panel( draw: ImageDraw.ImageDraw, box: tuple[int, int, int, int], rows: list[dict[str, Any]], boat: str, title: str, show_legend: bool = False, ) -> None: x0, y0, x1, y1 = box axis_font = font(14) small_font = font(12) title_font = font(21, True) draw.text((x0, y0 - 36), title, fill=(20, 25, 30), font=title_font) draw.rectangle([x0, y0, x1, y1], outline=(210, 210, 210), width=1) pad_l, pad_b, pad_t, pad_r = 62, 88, 26, 22 px0, py0, px1, py1 = x0 + pad_l, y0 + pad_t, x1 - pad_r, y1 - pad_b boat_rows = [r for r in rows if r["boat"] == boat] max_fail = max(float(r["failure_percent"]) for r in boat_rows) y_max = max(5.0, math.ceil((max_fail + 3.0) / 5.0) * 5.0) for tick in [0.0, 0.25, 0.50, 0.75, 1.0]: y = py1 - tick * (py1 - py0) val = tick * y_max draw.line([px0, y, px1, y], fill=(232, 232, 232), width=1) draw.text((x0 + 8, y - 8), f"{val:.0f}", fill=(70, 70, 70), font=axis_font) draw.line([px0, py1, px1, py1], fill=(60, 60, 60), width=2) draw.line([px0, py0, px0, py1], fill=(60, 60, 60), width=2) draw.text((x0 + 8, y0 + 8), "failure (%)", fill=(70, 70, 70), font=axis_font) flow_x = {} for i, flow in enumerate(FLOW_ORDER): x = px0 + i / (len(FLOW_ORDER) - 1) * (px1 - px0) flow_x[flow] = x draw.line([x, py1, x, py1 + 5], fill=(60, 60, 60), width=1) label = FLOW_LABEL[flow] draw.text((x - 28, py1 + 10), label, fill=(55, 55, 55), font=small_font) failures = {(r["method"], r["flow_type"]): float(r["failure_percent"]) for r in boat_rows} for method in METHOD_ORDER: pts = [] for flow in FLOW_ORDER: value = failures.get((method, flow), 0.0) x = flow_x[flow] y = py1 - (value / y_max) * (py1 - py0) pts.append((x, y)) draw.line(pts, fill=METHOD_COLORS[method], width=3) for x, y in pts: draw.ellipse([x - 3, y - 3, x + 3, y + 3], fill=METHOD_COLORS[method]) if show_legend: lx, ly = x0 + 80, y0 + 34 for i, method in enumerate(METHOD_ORDER): row = i // 4 col = i % 4 xx = lx + col * 145 yy = ly + row * 22 draw.line([xx, yy + 7, xx + 22, yy + 7], fill=METHOD_COLORS[method], width=3) draw.text((xx + 28, yy - 2), METHOD_SHORT[method], fill=(40, 40, 40), font=small_font) def failure_color(value: float, max_value: float = 60.0) -> tuple[int, int, int]: t = max(0.0, min(1.0, value / max_value)) stops = [ (0.00, (255, 255, 255)), (0.15, (255, 238, 210)), (0.35, (249, 177, 107)), (0.65, (220, 72, 55)), (1.00, (120, 28, 45)), ] for (t0, c0), (t1, c1) in zip(stops, stops[1:]): if t <= t1: alpha = (t - t0) / (t1 - t0) if t1 > t0 else 0.0 return tuple(int(c0[i] + alpha * (c1[i] - c0[i])) for i in range(3)) return stops[-1][1] def draw_failure_heatmap_panel( draw: ImageDraw.ImageDraw, box: tuple[int, int, int, int], rows: list[dict[str, Any]], boat: str, title: str, ) -> None: x0, y0, x1, y1 = box title_font = font(30, True) label_font = font(22, True) tick_font = font(19) cell_font = font(17, True) draw.text((x0, y0), title, fill=(20, 25, 30), font=title_font) left_w = 160 top_h = 58 heat_x0 = x0 + left_w heat_y0 = y0 + top_h heat_x1 = x1 - 22 heat_y1 = y1 - 22 cell_w = (heat_x1 - heat_x0) / len(FLOW_ORDER) cell_h = (heat_y1 - heat_y0) / len(METHOD_ORDER) values = {(r["method"], r["flow_type"]): float(r["failure_percent"]) for r in rows if r["boat"] == boat} short_flow = { "No flow": "No", "Uniform": "Uni", "Vortex": "Vort", "Double gyre": "Gyre", "Source/sink": "Src", "Src/sink pair": "Pair", "Gradient": "Grad", "Shear": "Shear", "Turbulent": "Turb", "Fourier": "Fourier", } for col, flow in enumerate(FLOW_ORDER): label = short_flow.get(FLOW_LABEL[flow], FLOW_LABEL[flow]) x = heat_x0 + col * cell_w tw = draw.textlength(label, font=tick_font) draw.text((x + (cell_w - tw) / 2, heat_y0 - 29), label, fill=(45, 45, 45), font=tick_font) for row_idx, method in enumerate(METHOD_ORDER): y = heat_y0 + row_idx * cell_h label = METHOD_SHORT[method] row_font = label_font if method == "flowmo" else tick_font draw.text((x0 + 6, y + cell_h * 0.25), label, fill=(35, 35, 35), font=row_font) if method == "pid_los_controller": draw.line([x0, y, heat_x1, y], fill=(65, 65, 65), width=3) for col, flow in enumerate(FLOW_ORDER): x = heat_x0 + col * cell_w value = values.get((method, flow), 0.0) color = failure_color(value) draw.rectangle([x, y, x + cell_w, y + cell_h], fill=color, outline=(238, 238, 238), width=1) text = f"{value:.0f}" text_color = (255, 255, 255) if value >= 36.0 else (35, 35, 35) tw = draw.textlength(text, font=cell_font) draw.text((x + (cell_w - tw) / 2, y + cell_h * 0.26), text, fill=text_color, font=cell_font) draw.rectangle([heat_x0, heat_y0, heat_x1, heat_y1], outline=(120, 120, 120), width=2) def draw_failure_colorbar(draw: ImageDraw.ImageDraw, box: tuple[int, int, int, int]) -> None: x0, y0, x1, y1 = box tick_font = font(18) draw.text((x0, y0 - 28), "failure rate (%)", fill=(40, 40, 40), font=tick_font) for i in range(x0, x1): value = (i - x0) / max(1, x1 - x0) * 60.0 draw.line([i, y0, i, y1], fill=failure_color(value), width=1) draw.rectangle([x0, y0, x1, y1], outline=(120, 120, 120), width=1) for tick in [0, 15, 30, 45, 60]: x = x0 + (tick / 60.0) * (x1 - x0) draw.line([x, y1, x, y1 + 8], fill=(50, 50, 50), width=1) draw.text((x - 12, y1 + 12), str(tick), fill=(50, 50, 50), font=tick_font) def draw_failure_colorbar_vertical(draw: ImageDraw.ImageDraw, box: tuple[int, int, int, int]) -> None: x0, y0, x1, y1 = box tick_font = font(18) label_font = font(19) for y in range(y0, y1): value = (y1 - y) / max(1, y1 - y0) * 60.0 draw.line([x0, y, x1, y], fill=failure_color(value), width=1) draw.rectangle([x0, y0, x1, y1], outline=(120, 120, 120), width=1) for tick in [0, 15, 30, 45, 60]: y = y1 - (tick / 60.0) * (y1 - y0) draw.line([x0 - 8, y, x0, y], fill=(50, 50, 50), width=1) draw.text((x1 + 8, y - 10), str(tick), fill=(50, 50, 50), font=tick_font) draw.text((x0 - 56, y0 - 34), "failure rate (%)", fill=(40, 40, 40), font=label_font) def make_fig5(summaries: list[SummaryRecord]) -> None: fig_dir = OUT / "fig5" ensure_dir(fig_dir) rows = aggregate_success(inferred_summaries(summaries), ("boat", "flow_type", "method")) rows.sort(key=lambda r: (boat_sort_key(r["boat"]), flow_sort_key(r["flow_type"]), method_sort_key(r["method"]))) for row in rows: row["boat_label"] = BOAT_LABEL[row["boat"]] row["flow_label"] = FLOW_LABEL[row["flow_type"]] row["method_label"] = METHOD_LABEL[row["method"]] row["failure_rate"] = 1.0 - float(row["success_rate"]) row["failure_percent"] = 100.0 * row["failure_rate"] row["success_percent"] = 100.0 * float(row["success_rate"]) write_rows( fig_dir / "figure5_failure_by_flow", rows, [ "boat", "boat_label", "flow_type", "flow_label", "method", "method_label", "successes", "episodes", "success_rate", "success_percent", "failure_rate", "failure_percent", "source_files", ], ) source_rows = [] for r in sorted(inferred_summaries(summaries), key=lambda x: (boat_sort_key(x.boat), flow_sort_key(x.flow_type), method_sort_key(x.method), task_sort_key(x.task))): source_rows.append( { "boat": r.boat, "flow_type": r.flow_type, "method": r.method, "task": r.task, "successes": r.successes, "episodes": r.episodes, "success_rate": r.success_rate, "failure_rate": 1.0 - r.success_rate, "source_file": r.source_file, "item_index": r.item_index, "json_path_successes": f"$[{r.item_index}].by_context.inferred.successes", "json_path_episodes": f"$[{r.item_index}].by_context.inferred.episodes", } ) write_rows( fig_dir / "figure5_failure_by_flow_source_rows", source_rows, [ "boat", "flow_type", "method", "task", "successes", "episodes", "success_rate", "failure_rate", "source_file", "item_index", "json_path_successes", "json_path_episodes", ], ) canvas = Image.new("RGB", (2400, 585), "white") draw = ImageDraw.Draw(canvas) draw_failure_heatmap_panel(draw, (30, 26, 1132, 540), rows, "twin", "(A) Twin") draw_failure_heatmap_panel(draw, (1162, 26, 2264, 540), rows, "triangle", "(B) Triangle") draw_failure_colorbar_vertical(draw, (2310, 84, 2336, 518)) out = fig_dir / "figure5_failure_by_flow.png" trim_whitespace(canvas, pad_x=8, pad_y=0).save(out) md = [ "# Paper Figure 5 Provenance", "", "Purpose: paper Figure 5 flow-family breakdown of downstream planning failure rates. Failure rate is used because many success rates are 100% or near 100%.", "", "Generated outputs:", f"- `{rel(out)}`", f"- `{rel(fig_dir / 'figure5_failure_by_flow.tsv')}`", f"- `{rel(fig_dir / 'figure5_failure_by_flow.csv')}`", f"- `{rel(fig_dir / 'figure5_failure_by_flow_source_rows.tsv')}`", f"- `{rel(fig_dir / 'figure5_failure_by_flow_source_rows.csv')}`", "", "Source:", f"- `{rel(PLANNING_DIR)}/*.json`", "- JSON selectors: `$[item_index].by_context.inferred.successes` and `$[item_index].by_context.inferred.episodes`.", "- Aggregation: sum successes and episodes over all tasks for each method, boat, and flow type.", "- Failure rate: `1 - successes / episodes`.", "- Row-level source entries are recorded in `figure5_failure_by_flow_source_rows.tsv/csv` with `source_file`, `item_index`, and JSON path columns.", "- Excluded diagnostic FlowMo contexts: `zero`, `shuffled`.", "", "Traditional controller naming:", "- `No-Flow LOS`: line-of-sight controller with no ambient-flow compensation.", "- `Current-Estimator LOS`: line-of-sight controller with an online drift/current estimate from recent pose history.", "- `Oracle-Flow LOS`: line-of-sight controller with privileged true local simulator flow feed-forward.", ] write_text(fig_dir / "figure5_provenance.md", "\n".join(md) + "\n") def make_table1(summaries: list[SummaryRecord]) -> None: table_dir = OUT / "tables" ensure_dir(table_dir) records = inferred_summaries(summaries) by_key = {(r.task, r.boat, r.flow_type, r.method): r for r in records} data_rows: list[dict[str, Any]] = [] latex_rows: list[list[str]] = [] for task in TASK_ORDER: for boat in BOAT_ORDER: for flow in FLOW_ORDER: row: dict[str, Any] = { "task": task, "task_label": TASK_LABEL[task], "boat": boat, "boat_label": BOAT_LABEL[boat], "flow_type": flow, "flow_label": FLOW_LABEL[flow], } latex_row = [TASK_LABEL[task], BOAT_LABEL[boat], FLOW_LABEL[flow]] for method in METHOD_ORDER: rec = by_key.get((task, boat, flow, method)) if rec is None: row[f"{method}_success_rate"] = "" row[f"{method}_successes"] = "" row[f"{method}_episodes"] = "" row[f"{method}_source"] = "" latex_row.append("--") else: row[f"{method}_success_rate"] = rec.success_rate row[f"{method}_success_percent"] = 100.0 * rec.success_rate row[f"{method}_successes"] = rec.successes row[f"{method}_episodes"] = rec.episodes row[f"{method}_source"] = rec.source_file row[f"{method}_json_path"] = f"$[{rec.item_index}].by_context.inferred.success_rate" latex_row.append(pct(rec.success_rate, 0)) data_rows.append(row) latex_rows.append(latex_row) fields = ["task", "task_label", "boat", "boat_label", "flow_type", "flow_label"] for method in METHOD_ORDER: fields += [ f"{method}_success_rate", f"{method}_success_percent", f"{method}_successes", f"{method}_episodes", f"{method}_source", f"{method}_json_path", ] write_rows(table_dir / "table1_success_by_task_boat_flow", data_rows, fields) header = ["Task", "Boat", "Flow"] + [METHOD_SHORT[m] for m in METHOD_ORDER] write_table( table_dir / "table1_success_by_task_boat_flow.tex", header, latex_rows, "Planning success rate by task, boat, and flow family. Values are percentages over 50 episodes per setting; NF-LOS, CE-LOS, and OF-LOS denote No-Flow LOS, Current-Estimator LOS, and Oracle-Flow LOS. FlowMo diagnostic zero/shuffled contexts are excluded.", "tab:planning_success_task_boat_flow", ) md = [ "# Table 1 Provenance", "", "Purpose: success rate for every task, boat, flow family, and method.", "", "Generated outputs:", f"- `{rel(table_dir / 'table1_success_by_task_boat_flow.tex')}`", f"- `{rel(table_dir / 'table1_success_by_task_boat_flow.tsv')}`", f"- `{rel(table_dir / 'table1_success_by_task_boat_flow.csv')}`", "", "Source:", f"- `{rel(PLANNING_DIR)}/*.json`", "- JSON selector per cell: `$[item_index].by_context.inferred.success_rate`.", "- Companion fields in TSV/CSV include `$[item_index].by_context.inferred.successes` and `episodes`.", "- Excluded diagnostic FlowMo contexts: `zero`, `shuffled`.", "", "Traditional controller naming:", "- `No-Flow LOS`: line-of-sight controller with no ambient-flow compensation.", "- `Current-Estimator LOS`: line-of-sight controller with an online drift/current estimate from recent pose history.", "- `Oracle-Flow LOS`: line-of-sight controller with privileged true local simulator flow feed-forward.", ] write_text(table_dir / "table1_provenance.md", "\n".join(md) + "\n") def make_table2(episodes: list[EpisodeRecord]) -> None: table_dir = OUT / "tables" ensure_dir(table_dir) records = inferred_episodes(episodes) grouped: dict[tuple[str, str, str], list[EpisodeRecord]] = defaultdict(list) sources: dict[tuple[str, str, str], set[str]] = defaultdict(set) for r in records: key = (r.task, r.boat, r.method) grouped[key].append(r) sources[key].add(r.source_file) data_rows: list[dict[str, Any]] = [] latex_rows: list[list[str]] = [] for task in TASK_ORDER: for boat in BOAT_ORDER: for method in METHOD_ORDER: key = (task, boat, method) items = grouped[key] if not items: continue final_distance_mean = sum(r.final_distance for r in items) / len(items) min_goal_distance_mean = sum(r.mean_min_goal_distance for r in items) / len(items) success_items = [r for r in items if r.success and r.energy is not None] energy_success_mean = sum(float(r.energy) for r in success_items) / len(success_items) if success_items else None row = { "task": task, "task_label": TASK_LABEL[task], "boat": boat, "boat_label": BOAT_LABEL[boat], "method": method, "method_label": METHOD_LABEL[method], "episodes": len(items), "successful_energy_episodes": len(success_items), "final_distance_mean": final_distance_mean, "mean_min_goal_distance": min_goal_distance_mean, "energy_success_mean": energy_success_mean if energy_success_mean is not None else "", "source_files": ";".join(sorted(sources[key])), "json_selector": "$[item_index].results[*] filtered by context_mode == inferred", } data_rows.append(row) latex_rows.append( [ TASK_LABEL[task], BOAT_LABEL[boat], METHOD_LABEL[method], fmt(final_distance_mean), fmt(energy_success_mean), ] ) write_rows( table_dir / "table2_energy_distance_by_task_boat_method", data_rows, [ "task", "task_label", "boat", "boat_label", "method", "method_label", "episodes", "successful_energy_episodes", "final_distance_mean", "mean_min_goal_distance", "energy_success_mean", "source_files", "json_selector", ], ) write_table( table_dir / "table2_energy_distance_by_task_boat_method.tex", ["Task", "Boat", "Method", "Final dist.", "Energy (succ.)"], latex_rows, "Planning distance and energy by task, boat, and method, aggregated over all flow families. Final distance is averaged over all episodes; energy is averaged over successful episodes.", "tab:planning_energy_distance_task_boat", ) md = [ "# Table 2 Provenance", "", "Purpose: distance and energy metrics by task, boat, and method.", "", "Generated outputs:", f"- `{rel(table_dir / 'table2_energy_distance_by_task_boat_method.tex')}`", f"- `{rel(table_dir / 'table2_energy_distance_by_task_boat_method.tsv')}`", f"- `{rel(table_dir / 'table2_energy_distance_by_task_boat_method.csv')}`", "", "Source:", f"- `{rel(PLANNING_DIR)}/*.json`", "- JSON selector: `$[item_index].results[*]`, filtered to `context_mode == inferred`.", "- Final distance: mean of `final_distance` over all filtered episodes.", "- Energy: mean of `energy` over filtered successful episodes only.", "- The TSV/CSV also includes `mean_min_goal_distance`, computed from the same filtered episodes.", "- Excluded diagnostic FlowMo contexts: `zero`, `shuffled`.", "", "Traditional controller naming:", "- `No-Flow LOS`: line-of-sight controller with no ambient-flow compensation.", "- `Current-Estimator LOS`: line-of-sight controller with an online drift/current estimate from recent pose history.", "- `Oracle-Flow LOS`: line-of-sight controller with privileged true local simulator flow feed-forward.", ] write_text(table_dir / "table2_provenance.md", "\n".join(md) + "\n") def make_probe_table() -> None: table_dir = OUT / "tables" ensure_dir(table_dir) data = read_json(PROBE_JSON) split = data["splits"]["test"] feature_order = ["z", "c", "z_c"] target_order = ["momentum", "local_flow", "episode_drift"] rows: list[dict[str, Any]] = [] latex_rows: list[list[str]] = [] for target in target_order: latex_row = [target.replace("_", " ")] for feature in feature_order: entry = split[target][feature] rows.append( { "split": "test", "target": target, "feature": feature, "r2_mean": entry["r2_mean"], "rmse": entry["rmse"], "source_file": rel(PROBE_JSON), "json_path_r2": f"$.splits.test.{target}.{feature}.r2_mean", "json_path_rmse": f"$.splits.test.{target}.{feature}.rmse", } ) latex_row.append(fmt(entry["r2_mean"], 3)) for feature in feature_order: latex_row.append(fmt(split[target][feature]["rmse"], 3)) latex_rows.append(latex_row) write_rows( table_dir / "table3_probe_diagnostics", rows, ["split", "target", "feature", "r2_mean", "rmse", "source_file", "json_path_r2", "json_path_rmse"], ) write_table( table_dir / "table3_probe_diagnostics.tex", ["Target", "z R2", "c R2", "z+c R2", "z RMSE", "c RMSE", "z+c RMSE"], latex_rows, "Frozen linear probe diagnostics for FlowMo representations on the test split.", "tab:flowmo_probe_diagnostics", ) md = [ "# Table 3 Provenance", "", "Purpose: frozen linear probe diagnostics for FlowMo latent variables.", "", "Generated outputs:", f"- `{rel(table_dir / 'table3_probe_diagnostics.tex')}`", f"- `{rel(table_dir / 'table3_probe_diagnostics.tsv')}`", f"- `{rel(table_dir / 'table3_probe_diagnostics.csv')}`", "", "Source:", f"- `{rel(PROBE_JSON)}`", "- JSON selectors: `$.splits.test...r2_mean` and `$.splits.test...rmse`.", "- Probe model: frozen FlowMo features with ridge regression, as reported by the JSON metadata.", ] write_text(table_dir / "table3_provenance.md", "\n".join(md) + "\n") def make_overview() -> None: files = sorted(p for p in OUT.rglob("*") if p.is_file()) lines = [ "# Paper Artifact Export", "", "Generated from local experiment outputs under `experiments/reports/`.", "", "Important source files:", f"- `{rel(PREDICTION_JSON)}`", f"- `{rel(PROBE_JSON)}`", f"- `{rel(PLANNING_DIR)}/*.json`", f"- `{rel(GIF_DIR)}/*.gif`", "", "Traditional controller names used in these exports:", ] for method in TRADITIONAL_METHODS: lines.append(f"- `{METHOD_LABEL[method]}`: {METHOD_DESCRIPTION[method]}") lines += [ "", "Generated files:", ] for p in files: lines.append(f"- `{rel(p)}`") write_text(OUT / "README.md", "\n".join(lines) + "\n") def validate_inputs() -> None: missing = [p for p in [PREDICTION_JSON, PROBE_JSON, PLANNING_DIR, GIF_DIR] if not p.exists()] if missing: raise FileNotFoundError("Missing required experiment outputs: " + ", ".join(str(p) for p in missing)) planning_files = sorted(PLANNING_DIR.glob("*.json")) if len(planning_files) != len(TASK_ORDER) * len(BOAT_ORDER) * len(FLOW_ORDER): raise RuntimeError(f"Expected {len(TASK_ORDER) * len(BOAT_ORDER) * len(FLOW_ORDER)} planning JSON files, found {len(planning_files)}") def main() -> None: validate_inputs() if OUT.exists(): shutil.rmtree(OUT) ensure_dir(OUT) summaries, episodes = load_planning() extract_fig3() make_fig4(summaries) make_fig5(summaries) make_table1(summaries) make_table2(episodes) make_probe_table() make_overview() print(f"Wrote paper artifacts to {OUT}") if __name__ == "__main__": main()