Spaces:
Running
Running
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import time | |
| from pathlib import Path | |
| from typing import Any | |
| import httpx | |
| from trajectory_lib import DEFAULT_ENV_SERVER_URL, load_json, post_json, summarize_step | |
| JsonDict = dict[str, Any] | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser(description="Replay a saved trajectory against a fresh environment reset.") | |
| parser.add_argument("trajectory", help="Path to a trajectory directory or its manifest.json file.") | |
| parser.add_argument("--env-url", default=DEFAULT_ENV_SERVER_URL, help="Base URL of the environment server.") | |
| parser.add_argument("--pause-ms", type=int, default=400, help="Delay between replayed actions.") | |
| parser.add_argument( | |
| "--interactive", | |
| action="store_true", | |
| help="Wait for Enter before each action so the browser can be stepped through manually.", | |
| ) | |
| parser.add_argument("--stop-on-done", action="store_true", help="Stop when the replayed environment reports done=true.") | |
| return parser.parse_args() | |
| def resolve_manifest(path_value: str) -> Path: | |
| path = Path(path_value) | |
| if path.is_dir(): | |
| path = path / "manifest.json" | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Manifest not found: {path}") | |
| return path | |
| def load_steps(steps_path: Path) -> list[JsonDict]: | |
| rows: list[JsonDict] = [] | |
| with steps_path.open(encoding="utf-8") as handle: | |
| for line in handle: | |
| line = line.strip() | |
| if line: | |
| rows.append(json.loads(line)) | |
| return rows | |
| def main() -> None: | |
| args = parse_args() | |
| manifest_path = resolve_manifest(args.trajectory) | |
| manifest = load_json(manifest_path) | |
| trajectory_dir = manifest_path.parent | |
| steps = load_steps(trajectory_dir / manifest["steps_file"]) | |
| reset_request = manifest.get("reset_request") or None | |
| action_steps = [step for step in steps if step["kind"] == "step"] | |
| with httpx.Client(base_url=args.env_url, timeout=60.0) as client: | |
| reset_response = post_json(client, "/reset", reset_request) | |
| print(f"reset | {summarize_step(reset_response)}") | |
| for step in action_steps: | |
| action = step["action"] | |
| if args.interactive: | |
| print(f"next[{step['index']}]={action['type']} | action={json.dumps(action)}") | |
| input("Press Enter to replay this action...") | |
| response = post_json(client, "/step", action) | |
| print(f"replay[{step['index']}]={action['type']} | {summarize_step(response)}") | |
| if args.stop_on_done and response.get("done"): | |
| break | |
| if not args.interactive: | |
| time.sleep(max(args.pause_ms, 0) / 1000) | |
| print(f"replayed {len(action_steps)} actions from {trajectory_dir}") | |
| if __name__ == "__main__": | |
| main() | |