| |
| """Generate synthetic multi-robot planning data for fine-tuning a planner LLM. |
| |
| Uses AGORA's heuristic DecisionEngine to produce ground-truth task allocations |
| across diverse team compositions, task sets, and failure scenarios. Outputs a |
| JSONL dataset suitable for instruction-tuning with TRL/SFT. |
| |
| Output: /mnt/artifacts-datai/logs/project_agora/planning_train.jsonl |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import json |
| import random |
| import sys |
| import uuid |
| from dataclasses import dataclass |
| from datetime import datetime, timedelta, timezone |
| from pathlib import Path |
|
|
| |
| sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src")) |
|
|
| from anima_agora.control.brain import Brain, BrainConfig |
| from anima_agora.control.contracts import TaskRequest |
| from anima_agora.memory.stem_core import ( |
| EmbodimentProfile, |
| Pose, |
| Quaternion, |
| RobotCapability, |
| RobotState, |
| SceneGraph, |
| SemanticLandmark, |
| STEMMemoryState, |
| TaskEvent, |
| TaskStatus, |
| Vector3D, |
| ) |
|
|
| |
| |
| |
|
|
| ROBOT_TYPES = [ |
| ("manipulator", ["manipulation"], {"arm": "6DOF", "gripper": "parallel"}), |
| ("mobile_base", ["navigation"], {"lidar": "2D", "camera": "RGB"}), |
| ("drone", ["navigation", "sensing"], {"camera": "RGBD", "gps": "RTK"}), |
| ("humanoid", ["manipulation", "navigation"], {"camera": "stereo", "imu": "9DOF"}), |
| ("agv", ["navigation"], {"lidar": "3D", "ultrasonic": "array"}), |
| ("inspection_bot", ["sensing", "navigation"], {"thermal": "FLIR", "camera": "4K"}), |
| ] |
|
|
| LOCATIONS = [ |
| "kitchen", "living_room", "bedroom", "bathroom", "garage", |
| "warehouse_a", "warehouse_b", "loading_dock", "office", |
| "lab", "hallway", "entrance", "storage_room", "rooftop", |
| ] |
|
|
| OBJECTS = [ |
| "mug", "plate", "bottle", "box", "tool", "book", "laptop", |
| "sensor_module", "battery_pack", "cable", "wrench", "package", |
| "sample_container", "fire_extinguisher", "first_aid_kit", |
| ] |
|
|
| TASK_TEMPLATES = { |
| "manipulation": [ |
| "pick up {obj} from {loc}", |
| "place {obj} on counter in {loc}", |
| "grasp {obj} and carry to {loc}", |
| "lift {obj} from shelf in {loc}", |
| ], |
| "navigation": [ |
| "navigate to {loc}", |
| "patrol {loc} perimeter", |
| "move to {loc} for inspection", |
| "drive to {loc} waypoint", |
| ], |
| "sensing": [ |
| "inspect {loc} for anomalies", |
| "scan {obj} in {loc}", |
| "observe {loc} environment", |
| "detect obstacles in {loc}", |
| ], |
| "mixed": [ |
| "pick up {obj} from {loc} and deliver to {loc2}", |
| "navigate to {loc} then inspect {obj}", |
| "scan {loc} and pick up any {obj} found", |
| ], |
| } |
|
|
|
|
| |
| |
| |
|
|
| def make_capability(name: str, category: str, success_rate: float = 0.9) -> RobotCapability: |
| return RobotCapability( |
| capability_id=f"cap_{name}_{uuid.uuid4().hex[:6]}", |
| name=name, |
| category=category, |
| success_rate=max(0.1, min(1.0, success_rate)), |
| avg_execution_time=random.uniform(5.0, 30.0), |
| ) |
|
|
|
|
| def make_robot( |
| robot_id: str, |
| robot_type: str, |
| cap_categories: list[str], |
| sensors: dict[str, str], |
| *, |
| battery: float | None = None, |
| state: RobotState = RobotState.IDLE, |
| location: str | None = None, |
| ) -> EmbodimentProfile: |
| capabilities = {} |
| for cat in cap_categories: |
| cap = make_capability(cat, cat, success_rate=random.uniform(0.6, 0.99)) |
| capabilities[cap.capability_id] = cap |
| return EmbodimentProfile( |
| robot_id=robot_id, |
| robot_type=robot_type, |
| mass_kg=random.uniform(5.0, 80.0), |
| height_m=random.uniform(0.3, 1.8), |
| max_speed_m_s=random.uniform(0.5, 3.0), |
| battery_capacity_wh=random.uniform(50.0, 500.0), |
| sensors=sensors, |
| capabilities=capabilities, |
| current_state=state, |
| battery_pct=battery if battery is not None else random.uniform(20.0, 100.0), |
| location=location or random.choice(LOCATIONS), |
| ) |
|
|
|
|
| def make_scene(location: str, n_objects: int = 3) -> SceneGraph: |
| now = datetime.now(timezone.utc) |
| objects = {} |
| selected = random.sample(OBJECTS, min(n_objects, len(OBJECTS))) |
| for obj_name in selected: |
| lm_id = f"lm_{obj_name}_{uuid.uuid4().hex[:4]}" |
| objects[obj_name] = SemanticLandmark( |
| landmark_id=lm_id, |
| name=obj_name, |
| pose=Pose( |
| position=Vector3D( |
| x=random.uniform(-5, 5), |
| y=random.uniform(-5, 5), |
| z=random.uniform(0, 2), |
| ), |
| orientation=Quaternion(x=0, y=0, z=0, w=1), |
| timestamp=now, |
| ), |
| category="object", |
| ) |
| return SceneGraph( |
| scene_id=f"scene_{location}_{uuid.uuid4().hex[:6]}", |
| timestamp=now, |
| robot_id="observer", |
| location_name=location, |
| objects=objects, |
| ) |
|
|
|
|
| def make_task_history( |
| robot_ids: list[str], |
| n_events: int = 5, |
| ) -> list[TaskEvent]: |
| events = [] |
| now = datetime.now(timezone.utc) |
| for i in range(n_events): |
| robot_id = random.choice(robot_ids) |
| start = now - timedelta(hours=random.uniform(0.5, 6.0)) |
| end = start + timedelta(seconds=random.uniform(10, 120)) |
| success = random.random() > 0.2 |
| task_name = random.choice([ |
| "pick up mug", "navigate to kitchen", "inspect warehouse_a", |
| "place box on counter", "patrol hallway", |
| ]) |
| events.append(TaskEvent( |
| event_id=f"evt_{uuid.uuid4().hex[:8]}", |
| task_name=task_name, |
| robot_id=robot_id, |
| start_time=start, |
| end_time=end, |
| status=TaskStatus.COMPLETED if success else TaskStatus.FAILED, |
| success=success, |
| target_location=random.choice(LOCATIONS), |
| target_objects=[random.choice(OBJECTS)] if random.random() > 0.5 else [], |
| actions_planned=(ap := random.randint(1, 5)), |
| actions_completed=ap if success else random.randint(0, min(ap, 2)), |
| )) |
| return events |
|
|
|
|
| def generate_task_requests( |
| n_tasks: int, |
| *, |
| with_dependencies: bool = False, |
| ) -> list[TaskRequest]: |
| requests = [] |
| for i in range(n_tasks): |
| cat = random.choice(["manipulation", "navigation", "sensing", "mixed"]) |
| template = random.choice(TASK_TEMPLATES[cat]) |
| loc = random.choice(LOCATIONS) |
| loc2 = random.choice([l for l in LOCATIONS if l != loc]) |
| obj = random.choice(OBJECTS) |
| task_name = template.format(obj=obj, loc=loc, loc2=loc2) |
|
|
| caps: tuple[str, ...] = () |
| if cat == "manipulation": |
| caps = ("manipulation",) |
| elif cat == "navigation": |
| caps = ("navigation",) |
| elif cat == "sensing": |
| caps = ("sensing",) |
| elif cat == "mixed": |
| caps = ("manipulation", "navigation") if "pick" in task_name else ("sensing", "navigation") |
|
|
| dep_ids: tuple[str, ...] = () |
| if with_dependencies and i > 0 and random.random() > 0.6: |
| dep_idx = random.randint(0, i - 1) |
| dep_ids = (requests[dep_idx].task_id,) |
|
|
| requests.append(TaskRequest( |
| task_id=f"task_{i:03d}", |
| task_name=task_name, |
| required_capabilities=caps, |
| target_location=loc, |
| target_objects=(obj,) if random.random() > 0.3 else (), |
| priority=random.randint(0, 3), |
| dependency_ids=dep_ids, |
| )) |
| return requests |
|
|
|
|
| def build_scenario( |
| n_robots: int = 3, |
| n_tasks: int = 4, |
| *, |
| include_offline: bool = False, |
| include_low_battery: bool = False, |
| with_dependencies: bool = False, |
| include_history: bool = True, |
| include_scenes: bool = True, |
| ) -> tuple[STEMMemoryState, list[TaskRequest]]: |
| """Build a complete scenario with robots, tasks, history, and scenes.""" |
| robots = {} |
| robot_ids = [] |
| for i in range(n_robots): |
| rtype, caps, sensors = random.choice(ROBOT_TYPES) |
| rid = f"robot_{i:02d}" |
| state = RobotState.IDLE |
| battery = None |
| if include_offline and i == n_robots - 1: |
| state = RobotState.OFFLINE |
| if include_low_battery and i == 0: |
| battery = random.uniform(3.0, 8.0) |
| robots[rid] = make_robot( |
| rid, rtype, caps, sensors, battery=battery, state=state, |
| ) |
| robot_ids.append(rid) |
|
|
| scenes = {} |
| if include_scenes: |
| for loc in random.sample(LOCATIONS, min(3, len(LOCATIONS))): |
| sg = make_scene(loc) |
| scenes[sg.scene_id] = sg |
|
|
| history = make_task_history(robot_ids, n_events=random.randint(2, 8)) if include_history else [] |
| task_requests = generate_task_requests(n_tasks, with_dependencies=with_dependencies) |
|
|
| state = STEMMemoryState( |
| robot_profiles=robots, |
| scenes=scenes, |
| task_history=history, |
| ) |
| return state, task_requests |
|
|
|
|
| |
| |
| |
|
|
| SYSTEM_PROMPT = """You are AGORA, a multi-robot task planner. Given the current team state and task requests, assign each task to the best robot. Consider: |
| - Robot capabilities (manipulation, navigation, sensing) |
| - Battery levels (low battery robots should get fewer tasks) |
| - Location proximity (prefer robots already near the task location) |
| - Recent failures (avoid re-assigning failed tasks to the same robot) |
| - Task dependencies (respect ordering constraints) |
| - Load balancing (distribute tasks evenly) |
| |
| Respond with a JSON object containing: |
| - "assignments": {robot_id: [task_ids]} |
| - "reasoning": brief explanation of allocation decisions |
| - "unassigned": [task_ids that couldn't be assigned, with reasons]""" |
|
|
|
|
| def state_to_context(state: STEMMemoryState, tasks: list[TaskRequest]) -> str: |
| """Format STEM state and tasks as a user prompt.""" |
| lines = ["## Team State\n"] |
| for rid, profile in sorted(state.robot_profiles.items()): |
| caps = ", ".join(c.category for c in profile.capabilities.values()) |
| lines.append( |
| f"- **{rid}** ({profile.robot_type}): " |
| f"battery={profile.battery_pct:.0f}%, state={profile.current_state.value}, " |
| f"location={profile.location}, capabilities=[{caps}], " |
| f"speed={profile.max_speed_m_s:.1f}m/s" |
| ) |
|
|
| if state.scenes: |
| lines.append("\n## Known Scenes\n") |
| for sg in state.scenes.values(): |
| obj_names = ", ".join(sorted(sg.objects.keys())) |
| lines.append(f"- {sg.location_name}: objects=[{obj_names}]") |
|
|
| recent_failures = [e for e in state.task_history if not e.success] |
| if recent_failures: |
| lines.append("\n## Recent Failures\n") |
| for evt in recent_failures[-5:]: |
| lines.append(f"- {evt.robot_id} failed '{evt.task_name}' at {evt.target_location}") |
|
|
| lines.append("\n## Task Requests\n") |
| for task in tasks: |
| caps_str = ", ".join(task.required_capabilities) if task.required_capabilities else "any" |
| deps = f", depends_on=[{', '.join(task.dependency_ids)}]" if task.dependency_ids else "" |
| objs = f", objects=[{', '.join(task.target_objects)}]" if task.target_objects else "" |
| lines.append( |
| f"- **{task.task_id}**: \"{task.task_name}\" " |
| f"(caps=[{caps_str}], location={task.target_location}, " |
| f"priority={task.priority}{deps}{objs})" |
| ) |
|
|
| lines.append("\nAssign each task to the best robot. Return JSON.") |
| return "\n".join(lines) |
|
|
|
|
| def allocation_to_response( |
| plan, |
| tasks: list[TaskRequest], |
| ) -> str: |
| """Format a TaskPlan as the expected assistant response.""" |
| assignments = {} |
| for robot_id, task_assignments in plan.assignments.items(): |
| assignments[robot_id] = [a.task_id for a in task_assignments] |
|
|
| unassigned = [] |
| for task in plan.unassigned_tasks: |
| reason = plan.failure_reasons.get(task.task_id, "no suitable robot") |
| unassigned.append({"task_id": task.task_id, "reason": reason}) |
|
|
| response = { |
| "assignments": assignments, |
| "reasoning": plan.reasoning, |
| "unassigned": unassigned, |
| } |
| return json.dumps(response, indent=2) |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class DatasetStats: |
| total: int = 0 |
| fully_assigned: int = 0 |
| partial: int = 0 |
| empty: int = 0 |
| with_deps: int = 0 |
| with_failures: int = 0 |
| avg_robots: float = 0.0 |
| avg_tasks: float = 0.0 |
|
|
|
|
| async def generate_dataset( |
| n_examples: int = 5000, |
| output_path: str | None = None, |
| seed: int = 42, |
| ) -> DatasetStats: |
| """Generate the full training dataset.""" |
| random.seed(seed) |
| if output_path is None: |
| output_path = "/mnt/artifacts-datai/logs/project_agora/planning_train.jsonl" |
| Path(output_path).parent.mkdir(parents=True, exist_ok=True) |
|
|
| brain = Brain(BrainConfig(mllm_provider="heuristic")) |
| stats = DatasetStats() |
| total_robots = 0 |
| total_tasks = 0 |
|
|
| with open(output_path, "w") as f: |
| for i in range(n_examples): |
| n_robots = random.randint(2, 6) |
| n_tasks = random.randint(1, 8) |
| with_deps = random.random() > 0.4 |
| include_offline = random.random() > 0.7 |
| include_low_battery = random.random() > 0.6 |
| include_history = random.random() > 0.2 |
|
|
| state, tasks = build_scenario( |
| n_robots=n_robots, |
| n_tasks=n_tasks, |
| include_offline=include_offline, |
| include_low_battery=include_low_battery, |
| with_dependencies=with_deps, |
| include_history=include_history, |
| ) |
|
|
| plan = await brain.plan_team_tasks(state, tasks) |
|
|
| user_prompt = state_to_context(state, tasks) |
| assistant_response = allocation_to_response(plan, tasks) |
|
|
| example = { |
| "messages": [ |
| {"role": "system", "content": SYSTEM_PROMPT}, |
| {"role": "user", "content": user_prompt}, |
| {"role": "assistant", "content": assistant_response}, |
| ], |
| } |
| f.write(json.dumps(example) + "\n") |
|
|
| stats.total += 1 |
| total_robots += n_robots |
| total_tasks += n_tasks |
| if not plan.unassigned_tasks: |
| stats.fully_assigned += 1 |
| elif plan.assignments: |
| stats.partial += 1 |
| else: |
| stats.empty += 1 |
| if with_deps: |
| stats.with_deps += 1 |
| if any(not e.success for e in state.task_history): |
| stats.with_failures += 1 |
|
|
| if (i + 1) % 500 == 0: |
| print(f" Generated {i + 1}/{n_examples} examples...") |
|
|
| stats.avg_robots = total_robots / max(n_examples, 1) |
| stats.avg_tasks = total_tasks / max(n_examples, 1) |
|
|
| |
| eval_path = output_path.replace("_train.jsonl", "_eval.jsonl") |
| random.seed(seed + 1) |
| with open(eval_path, "w") as f: |
| for _ in range(200): |
| n_robots = random.randint(2, 6) |
| n_tasks = random.randint(2, 6) |
| state, tasks = build_scenario( |
| n_robots=n_robots, |
| n_tasks=n_tasks, |
| with_dependencies=random.random() > 0.5, |
| include_offline=random.random() > 0.7, |
| include_low_battery=random.random() > 0.6, |
| ) |
| plan = await brain.plan_team_tasks(state, tasks) |
| example = { |
| "messages": [ |
| {"role": "system", "content": SYSTEM_PROMPT}, |
| {"role": "user", "content": user_prompt}, |
| {"role": "assistant", "content": allocation_to_response(plan, tasks)}, |
| ], |
| } |
| f.write(json.dumps(example) + "\n") |
|
|
| print(f"\nDataset saved to: {output_path}") |
| print(f"Eval split saved to: {eval_path}") |
| return stats |
|
|
|
|
| if __name__ == "__main__": |
| import argparse |
| parser = argparse.ArgumentParser(description="Generate AGORA planning training data") |
| parser.add_argument("--n-examples", type=int, default=5000, help="Number of training examples") |
| parser.add_argument("--seed", type=int, default=42, help="Random seed") |
| parser.add_argument( |
| "--output", |
| default="/mnt/artifacts-datai/logs/project_agora/planning_train.jsonl", |
| help="Output JSONL path", |
| ) |
| args = parser.parse_args() |
|
|
| stats = asyncio.run(generate_dataset( |
| n_examples=args.n_examples, |
| output_path=args.output, |
| seed=args.seed, |
| )) |
| print("\n=== Dataset Statistics ===") |
| print(f"Total examples: {stats.total}") |
| print(f"Fully assigned: {stats.fully_assigned}") |
| print(f"Partial: {stats.partial}") |
| print(f"Empty (no robots): {stats.empty}") |
| print(f"With dependencies: {stats.with_deps}") |
| print(f"With failures: {stats.with_failures}") |
| print(f"Avg robots/scene: {stats.avg_robots:.1f}") |
| print(f"Avg tasks/scene: {stats.avg_tasks:.1f}") |
|
|