#!/usr/bin/env python3 """Generate synthetic multi-robot planning data for fine-tuning a planner LLM. Uses AGORA's heuristic DecisionEngine to produce ground-truth task allocations across diverse team compositions, task sets, and failure scenarios. Outputs a JSONL dataset suitable for instruction-tuning with TRL/SFT. Output: /mnt/artifacts-datai/logs/project_agora/planning_train.jsonl """ from __future__ import annotations import asyncio import json import random import sys import uuid from dataclasses import dataclass from datetime import datetime, timedelta, timezone from pathlib import Path # Ensure the package is importable sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src")) from anima_agora.control.brain import Brain, BrainConfig from anima_agora.control.contracts import TaskRequest from anima_agora.memory.stem_core import ( EmbodimentProfile, Pose, Quaternion, RobotCapability, RobotState, SceneGraph, SemanticLandmark, STEMMemoryState, TaskEvent, TaskStatus, Vector3D, ) # --------------------------------------------------------------------------- # Constants for scenario generation # --------------------------------------------------------------------------- ROBOT_TYPES = [ ("manipulator", ["manipulation"], {"arm": "6DOF", "gripper": "parallel"}), ("mobile_base", ["navigation"], {"lidar": "2D", "camera": "RGB"}), ("drone", ["navigation", "sensing"], {"camera": "RGBD", "gps": "RTK"}), ("humanoid", ["manipulation", "navigation"], {"camera": "stereo", "imu": "9DOF"}), ("agv", ["navigation"], {"lidar": "3D", "ultrasonic": "array"}), ("inspection_bot", ["sensing", "navigation"], {"thermal": "FLIR", "camera": "4K"}), ] LOCATIONS = [ "kitchen", "living_room", "bedroom", "bathroom", "garage", "warehouse_a", "warehouse_b", "loading_dock", "office", "lab", "hallway", "entrance", "storage_room", "rooftop", ] OBJECTS = [ "mug", "plate", "bottle", "box", "tool", "book", "laptop", "sensor_module", "battery_pack", "cable", "wrench", "package", "sample_container", "fire_extinguisher", "first_aid_kit", ] TASK_TEMPLATES = { "manipulation": [ "pick up {obj} from {loc}", "place {obj} on counter in {loc}", "grasp {obj} and carry to {loc}", "lift {obj} from shelf in {loc}", ], "navigation": [ "navigate to {loc}", "patrol {loc} perimeter", "move to {loc} for inspection", "drive to {loc} waypoint", ], "sensing": [ "inspect {loc} for anomalies", "scan {obj} in {loc}", "observe {loc} environment", "detect obstacles in {loc}", ], "mixed": [ "pick up {obj} from {loc} and deliver to {loc2}", "navigate to {loc} then inspect {obj}", "scan {loc} and pick up any {obj} found", ], } # --------------------------------------------------------------------------- # Scenario builders # --------------------------------------------------------------------------- def make_capability(name: str, category: str, success_rate: float = 0.9) -> RobotCapability: return RobotCapability( capability_id=f"cap_{name}_{uuid.uuid4().hex[:6]}", name=name, category=category, success_rate=max(0.1, min(1.0, success_rate)), avg_execution_time=random.uniform(5.0, 30.0), ) def make_robot( robot_id: str, robot_type: str, cap_categories: list[str], sensors: dict[str, str], *, battery: float | None = None, state: RobotState = RobotState.IDLE, location: str | None = None, ) -> EmbodimentProfile: capabilities = {} for cat in cap_categories: cap = make_capability(cat, cat, success_rate=random.uniform(0.6, 0.99)) capabilities[cap.capability_id] = cap return EmbodimentProfile( robot_id=robot_id, robot_type=robot_type, mass_kg=random.uniform(5.0, 80.0), height_m=random.uniform(0.3, 1.8), max_speed_m_s=random.uniform(0.5, 3.0), battery_capacity_wh=random.uniform(50.0, 500.0), sensors=sensors, capabilities=capabilities, current_state=state, battery_pct=battery if battery is not None else random.uniform(20.0, 100.0), location=location or random.choice(LOCATIONS), ) def make_scene(location: str, n_objects: int = 3) -> SceneGraph: now = datetime.now(timezone.utc) objects = {} selected = random.sample(OBJECTS, min(n_objects, len(OBJECTS))) for obj_name in selected: lm_id = f"lm_{obj_name}_{uuid.uuid4().hex[:4]}" objects[obj_name] = SemanticLandmark( landmark_id=lm_id, name=obj_name, pose=Pose( position=Vector3D( x=random.uniform(-5, 5), y=random.uniform(-5, 5), z=random.uniform(0, 2), ), orientation=Quaternion(x=0, y=0, z=0, w=1), timestamp=now, ), category="object", ) return SceneGraph( scene_id=f"scene_{location}_{uuid.uuid4().hex[:6]}", timestamp=now, robot_id="observer", location_name=location, objects=objects, ) def make_task_history( robot_ids: list[str], n_events: int = 5, ) -> list[TaskEvent]: events = [] now = datetime.now(timezone.utc) for i in range(n_events): robot_id = random.choice(robot_ids) start = now - timedelta(hours=random.uniform(0.5, 6.0)) end = start + timedelta(seconds=random.uniform(10, 120)) success = random.random() > 0.2 task_name = random.choice([ "pick up mug", "navigate to kitchen", "inspect warehouse_a", "place box on counter", "patrol hallway", ]) events.append(TaskEvent( event_id=f"evt_{uuid.uuid4().hex[:8]}", task_name=task_name, robot_id=robot_id, start_time=start, end_time=end, status=TaskStatus.COMPLETED if success else TaskStatus.FAILED, success=success, target_location=random.choice(LOCATIONS), target_objects=[random.choice(OBJECTS)] if random.random() > 0.5 else [], actions_planned=(ap := random.randint(1, 5)), actions_completed=ap if success else random.randint(0, min(ap, 2)), )) return events def generate_task_requests( n_tasks: int, *, with_dependencies: bool = False, ) -> list[TaskRequest]: requests = [] for i in range(n_tasks): cat = random.choice(["manipulation", "navigation", "sensing", "mixed"]) template = random.choice(TASK_TEMPLATES[cat]) loc = random.choice(LOCATIONS) loc2 = random.choice([l for l in LOCATIONS if l != loc]) obj = random.choice(OBJECTS) task_name = template.format(obj=obj, loc=loc, loc2=loc2) caps: tuple[str, ...] = () if cat == "manipulation": caps = ("manipulation",) elif cat == "navigation": caps = ("navigation",) elif cat == "sensing": caps = ("sensing",) elif cat == "mixed": caps = ("manipulation", "navigation") if "pick" in task_name else ("sensing", "navigation") dep_ids: tuple[str, ...] = () if with_dependencies and i > 0 and random.random() > 0.6: dep_idx = random.randint(0, i - 1) dep_ids = (requests[dep_idx].task_id,) requests.append(TaskRequest( task_id=f"task_{i:03d}", task_name=task_name, required_capabilities=caps, target_location=loc, target_objects=(obj,) if random.random() > 0.3 else (), priority=random.randint(0, 3), dependency_ids=dep_ids, )) return requests def build_scenario( n_robots: int = 3, n_tasks: int = 4, *, include_offline: bool = False, include_low_battery: bool = False, with_dependencies: bool = False, include_history: bool = True, include_scenes: bool = True, ) -> tuple[STEMMemoryState, list[TaskRequest]]: """Build a complete scenario with robots, tasks, history, and scenes.""" robots = {} robot_ids = [] for i in range(n_robots): rtype, caps, sensors = random.choice(ROBOT_TYPES) rid = f"robot_{i:02d}" state = RobotState.IDLE battery = None if include_offline and i == n_robots - 1: state = RobotState.OFFLINE if include_low_battery and i == 0: battery = random.uniform(3.0, 8.0) robots[rid] = make_robot( rid, rtype, caps, sensors, battery=battery, state=state, ) robot_ids.append(rid) scenes = {} if include_scenes: for loc in random.sample(LOCATIONS, min(3, len(LOCATIONS))): sg = make_scene(loc) scenes[sg.scene_id] = sg history = make_task_history(robot_ids, n_events=random.randint(2, 8)) if include_history else [] task_requests = generate_task_requests(n_tasks, with_dependencies=with_dependencies) state = STEMMemoryState( robot_profiles=robots, scenes=scenes, task_history=history, ) return state, task_requests # --------------------------------------------------------------------------- # Format as instruction-tuning examples # --------------------------------------------------------------------------- SYSTEM_PROMPT = """You are AGORA, a multi-robot task planner. Given the current team state and task requests, assign each task to the best robot. Consider: - Robot capabilities (manipulation, navigation, sensing) - Battery levels (low battery robots should get fewer tasks) - Location proximity (prefer robots already near the task location) - Recent failures (avoid re-assigning failed tasks to the same robot) - Task dependencies (respect ordering constraints) - Load balancing (distribute tasks evenly) Respond with a JSON object containing: - "assignments": {robot_id: [task_ids]} - "reasoning": brief explanation of allocation decisions - "unassigned": [task_ids that couldn't be assigned, with reasons]""" def state_to_context(state: STEMMemoryState, tasks: list[TaskRequest]) -> str: """Format STEM state and tasks as a user prompt.""" lines = ["## Team State\n"] for rid, profile in sorted(state.robot_profiles.items()): caps = ", ".join(c.category for c in profile.capabilities.values()) lines.append( f"- **{rid}** ({profile.robot_type}): " f"battery={profile.battery_pct:.0f}%, state={profile.current_state.value}, " f"location={profile.location}, capabilities=[{caps}], " f"speed={profile.max_speed_m_s:.1f}m/s" ) if state.scenes: lines.append("\n## Known Scenes\n") for sg in state.scenes.values(): obj_names = ", ".join(sorted(sg.objects.keys())) lines.append(f"- {sg.location_name}: objects=[{obj_names}]") recent_failures = [e for e in state.task_history if not e.success] if recent_failures: lines.append("\n## Recent Failures\n") for evt in recent_failures[-5:]: lines.append(f"- {evt.robot_id} failed '{evt.task_name}' at {evt.target_location}") lines.append("\n## Task Requests\n") for task in tasks: caps_str = ", ".join(task.required_capabilities) if task.required_capabilities else "any" deps = f", depends_on=[{', '.join(task.dependency_ids)}]" if task.dependency_ids else "" objs = f", objects=[{', '.join(task.target_objects)}]" if task.target_objects else "" lines.append( f"- **{task.task_id}**: \"{task.task_name}\" " f"(caps=[{caps_str}], location={task.target_location}, " f"priority={task.priority}{deps}{objs})" ) lines.append("\nAssign each task to the best robot. Return JSON.") return "\n".join(lines) def allocation_to_response( plan, tasks: list[TaskRequest], ) -> str: """Format a TaskPlan as the expected assistant response.""" assignments = {} for robot_id, task_assignments in plan.assignments.items(): assignments[robot_id] = [a.task_id for a in task_assignments] unassigned = [] for task in plan.unassigned_tasks: reason = plan.failure_reasons.get(task.task_id, "no suitable robot") unassigned.append({"task_id": task.task_id, "reason": reason}) response = { "assignments": assignments, "reasoning": plan.reasoning, "unassigned": unassigned, } return json.dumps(response, indent=2) # --------------------------------------------------------------------------- # Main generation loop # --------------------------------------------------------------------------- @dataclass class DatasetStats: total: int = 0 fully_assigned: int = 0 partial: int = 0 empty: int = 0 with_deps: int = 0 with_failures: int = 0 avg_robots: float = 0.0 avg_tasks: float = 0.0 async def generate_dataset( n_examples: int = 5000, output_path: str | None = None, seed: int = 42, ) -> DatasetStats: """Generate the full training dataset.""" random.seed(seed) if output_path is None: output_path = "/mnt/artifacts-datai/logs/project_agora/planning_train.jsonl" Path(output_path).parent.mkdir(parents=True, exist_ok=True) brain = Brain(BrainConfig(mllm_provider="heuristic")) stats = DatasetStats() total_robots = 0 total_tasks = 0 with open(output_path, "w") as f: for i in range(n_examples): n_robots = random.randint(2, 6) n_tasks = random.randint(1, 8) with_deps = random.random() > 0.4 include_offline = random.random() > 0.7 include_low_battery = random.random() > 0.6 include_history = random.random() > 0.2 state, tasks = build_scenario( n_robots=n_robots, n_tasks=n_tasks, include_offline=include_offline, include_low_battery=include_low_battery, with_dependencies=with_deps, include_history=include_history, ) plan = await brain.plan_team_tasks(state, tasks) user_prompt = state_to_context(state, tasks) assistant_response = allocation_to_response(plan, tasks) example = { "messages": [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_prompt}, {"role": "assistant", "content": assistant_response}, ], } f.write(json.dumps(example) + "\n") stats.total += 1 total_robots += n_robots total_tasks += n_tasks if not plan.unassigned_tasks: stats.fully_assigned += 1 elif plan.assignments: stats.partial += 1 else: stats.empty += 1 if with_deps: stats.with_deps += 1 if any(not e.success for e in state.task_history): stats.with_failures += 1 if (i + 1) % 500 == 0: print(f" Generated {i + 1}/{n_examples} examples...") stats.avg_robots = total_robots / max(n_examples, 1) stats.avg_tasks = total_tasks / max(n_examples, 1) # Also save a small eval split eval_path = output_path.replace("_train.jsonl", "_eval.jsonl") random.seed(seed + 1) with open(eval_path, "w") as f: for _ in range(200): n_robots = random.randint(2, 6) n_tasks = random.randint(2, 6) state, tasks = build_scenario( n_robots=n_robots, n_tasks=n_tasks, with_dependencies=random.random() > 0.5, include_offline=random.random() > 0.7, include_low_battery=random.random() > 0.6, ) plan = await brain.plan_team_tasks(state, tasks) example = { "messages": [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_prompt}, {"role": "assistant", "content": allocation_to_response(plan, tasks)}, ], } f.write(json.dumps(example) + "\n") print(f"\nDataset saved to: {output_path}") print(f"Eval split saved to: {eval_path}") return stats if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Generate AGORA planning training data") parser.add_argument("--n-examples", type=int, default=5000, help="Number of training examples") parser.add_argument("--seed", type=int, default=42, help="Random seed") parser.add_argument( "--output", default="/mnt/artifacts-datai/logs/project_agora/planning_train.jsonl", help="Output JSONL path", ) args = parser.parse_args() stats = asyncio.run(generate_dataset( n_examples=args.n_examples, output_path=args.output, seed=args.seed, )) print("\n=== Dataset Statistics ===") print(f"Total examples: {stats.total}") print(f"Fully assigned: {stats.fully_assigned}") print(f"Partial: {stats.partial}") print(f"Empty (no robots): {stats.empty}") print(f"With dependencies: {stats.with_deps}") print(f"With failures: {stats.with_failures}") print(f"Avg robots/scene: {stats.avg_robots:.1f}") print(f"Avg tasks/scene: {stats.avg_tasks:.1f}")