project_agora / scripts /generate_planning_data.py
ilessio-aiflowlab's picture
[AGORA] Full export: pth + safetensors + ONNX + TRT fp16 + TRT fp32
12d70dc verified
#!/usr/bin/env python3
"""Generate synthetic multi-robot planning data for fine-tuning a planner LLM.
Uses AGORA's heuristic DecisionEngine to produce ground-truth task allocations
across diverse team compositions, task sets, and failure scenarios. Outputs a
JSONL dataset suitable for instruction-tuning with TRL/SFT.
Output: /mnt/artifacts-datai/logs/project_agora/planning_train.jsonl
"""
from __future__ import annotations
import asyncio
import json
import random
import sys
import uuid
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
# Ensure the package is importable
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src"))
from anima_agora.control.brain import Brain, BrainConfig
from anima_agora.control.contracts import TaskRequest
from anima_agora.memory.stem_core import (
EmbodimentProfile,
Pose,
Quaternion,
RobotCapability,
RobotState,
SceneGraph,
SemanticLandmark,
STEMMemoryState,
TaskEvent,
TaskStatus,
Vector3D,
)
# ---------------------------------------------------------------------------
# Constants for scenario generation
# ---------------------------------------------------------------------------
ROBOT_TYPES = [
("manipulator", ["manipulation"], {"arm": "6DOF", "gripper": "parallel"}),
("mobile_base", ["navigation"], {"lidar": "2D", "camera": "RGB"}),
("drone", ["navigation", "sensing"], {"camera": "RGBD", "gps": "RTK"}),
("humanoid", ["manipulation", "navigation"], {"camera": "stereo", "imu": "9DOF"}),
("agv", ["navigation"], {"lidar": "3D", "ultrasonic": "array"}),
("inspection_bot", ["sensing", "navigation"], {"thermal": "FLIR", "camera": "4K"}),
]
LOCATIONS = [
"kitchen", "living_room", "bedroom", "bathroom", "garage",
"warehouse_a", "warehouse_b", "loading_dock", "office",
"lab", "hallway", "entrance", "storage_room", "rooftop",
]
OBJECTS = [
"mug", "plate", "bottle", "box", "tool", "book", "laptop",
"sensor_module", "battery_pack", "cable", "wrench", "package",
"sample_container", "fire_extinguisher", "first_aid_kit",
]
TASK_TEMPLATES = {
"manipulation": [
"pick up {obj} from {loc}",
"place {obj} on counter in {loc}",
"grasp {obj} and carry to {loc}",
"lift {obj} from shelf in {loc}",
],
"navigation": [
"navigate to {loc}",
"patrol {loc} perimeter",
"move to {loc} for inspection",
"drive to {loc} waypoint",
],
"sensing": [
"inspect {loc} for anomalies",
"scan {obj} in {loc}",
"observe {loc} environment",
"detect obstacles in {loc}",
],
"mixed": [
"pick up {obj} from {loc} and deliver to {loc2}",
"navigate to {loc} then inspect {obj}",
"scan {loc} and pick up any {obj} found",
],
}
# ---------------------------------------------------------------------------
# Scenario builders
# ---------------------------------------------------------------------------
def make_capability(name: str, category: str, success_rate: float = 0.9) -> RobotCapability:
return RobotCapability(
capability_id=f"cap_{name}_{uuid.uuid4().hex[:6]}",
name=name,
category=category,
success_rate=max(0.1, min(1.0, success_rate)),
avg_execution_time=random.uniform(5.0, 30.0),
)
def make_robot(
robot_id: str,
robot_type: str,
cap_categories: list[str],
sensors: dict[str, str],
*,
battery: float | None = None,
state: RobotState = RobotState.IDLE,
location: str | None = None,
) -> EmbodimentProfile:
capabilities = {}
for cat in cap_categories:
cap = make_capability(cat, cat, success_rate=random.uniform(0.6, 0.99))
capabilities[cap.capability_id] = cap
return EmbodimentProfile(
robot_id=robot_id,
robot_type=robot_type,
mass_kg=random.uniform(5.0, 80.0),
height_m=random.uniform(0.3, 1.8),
max_speed_m_s=random.uniform(0.5, 3.0),
battery_capacity_wh=random.uniform(50.0, 500.0),
sensors=sensors,
capabilities=capabilities,
current_state=state,
battery_pct=battery if battery is not None else random.uniform(20.0, 100.0),
location=location or random.choice(LOCATIONS),
)
def make_scene(location: str, n_objects: int = 3) -> SceneGraph:
now = datetime.now(timezone.utc)
objects = {}
selected = random.sample(OBJECTS, min(n_objects, len(OBJECTS)))
for obj_name in selected:
lm_id = f"lm_{obj_name}_{uuid.uuid4().hex[:4]}"
objects[obj_name] = SemanticLandmark(
landmark_id=lm_id,
name=obj_name,
pose=Pose(
position=Vector3D(
x=random.uniform(-5, 5),
y=random.uniform(-5, 5),
z=random.uniform(0, 2),
),
orientation=Quaternion(x=0, y=0, z=0, w=1),
timestamp=now,
),
category="object",
)
return SceneGraph(
scene_id=f"scene_{location}_{uuid.uuid4().hex[:6]}",
timestamp=now,
robot_id="observer",
location_name=location,
objects=objects,
)
def make_task_history(
robot_ids: list[str],
n_events: int = 5,
) -> list[TaskEvent]:
events = []
now = datetime.now(timezone.utc)
for i in range(n_events):
robot_id = random.choice(robot_ids)
start = now - timedelta(hours=random.uniform(0.5, 6.0))
end = start + timedelta(seconds=random.uniform(10, 120))
success = random.random() > 0.2
task_name = random.choice([
"pick up mug", "navigate to kitchen", "inspect warehouse_a",
"place box on counter", "patrol hallway",
])
events.append(TaskEvent(
event_id=f"evt_{uuid.uuid4().hex[:8]}",
task_name=task_name,
robot_id=robot_id,
start_time=start,
end_time=end,
status=TaskStatus.COMPLETED if success else TaskStatus.FAILED,
success=success,
target_location=random.choice(LOCATIONS),
target_objects=[random.choice(OBJECTS)] if random.random() > 0.5 else [],
actions_planned=(ap := random.randint(1, 5)),
actions_completed=ap if success else random.randint(0, min(ap, 2)),
))
return events
def generate_task_requests(
n_tasks: int,
*,
with_dependencies: bool = False,
) -> list[TaskRequest]:
requests = []
for i in range(n_tasks):
cat = random.choice(["manipulation", "navigation", "sensing", "mixed"])
template = random.choice(TASK_TEMPLATES[cat])
loc = random.choice(LOCATIONS)
loc2 = random.choice([l for l in LOCATIONS if l != loc])
obj = random.choice(OBJECTS)
task_name = template.format(obj=obj, loc=loc, loc2=loc2)
caps: tuple[str, ...] = ()
if cat == "manipulation":
caps = ("manipulation",)
elif cat == "navigation":
caps = ("navigation",)
elif cat == "sensing":
caps = ("sensing",)
elif cat == "mixed":
caps = ("manipulation", "navigation") if "pick" in task_name else ("sensing", "navigation")
dep_ids: tuple[str, ...] = ()
if with_dependencies and i > 0 and random.random() > 0.6:
dep_idx = random.randint(0, i - 1)
dep_ids = (requests[dep_idx].task_id,)
requests.append(TaskRequest(
task_id=f"task_{i:03d}",
task_name=task_name,
required_capabilities=caps,
target_location=loc,
target_objects=(obj,) if random.random() > 0.3 else (),
priority=random.randint(0, 3),
dependency_ids=dep_ids,
))
return requests
def build_scenario(
n_robots: int = 3,
n_tasks: int = 4,
*,
include_offline: bool = False,
include_low_battery: bool = False,
with_dependencies: bool = False,
include_history: bool = True,
include_scenes: bool = True,
) -> tuple[STEMMemoryState, list[TaskRequest]]:
"""Build a complete scenario with robots, tasks, history, and scenes."""
robots = {}
robot_ids = []
for i in range(n_robots):
rtype, caps, sensors = random.choice(ROBOT_TYPES)
rid = f"robot_{i:02d}"
state = RobotState.IDLE
battery = None
if include_offline and i == n_robots - 1:
state = RobotState.OFFLINE
if include_low_battery and i == 0:
battery = random.uniform(3.0, 8.0)
robots[rid] = make_robot(
rid, rtype, caps, sensors, battery=battery, state=state,
)
robot_ids.append(rid)
scenes = {}
if include_scenes:
for loc in random.sample(LOCATIONS, min(3, len(LOCATIONS))):
sg = make_scene(loc)
scenes[sg.scene_id] = sg
history = make_task_history(robot_ids, n_events=random.randint(2, 8)) if include_history else []
task_requests = generate_task_requests(n_tasks, with_dependencies=with_dependencies)
state = STEMMemoryState(
robot_profiles=robots,
scenes=scenes,
task_history=history,
)
return state, task_requests
# ---------------------------------------------------------------------------
# Format as instruction-tuning examples
# ---------------------------------------------------------------------------
SYSTEM_PROMPT = """You are AGORA, a multi-robot task planner. Given the current team state and task requests, assign each task to the best robot. Consider:
- Robot capabilities (manipulation, navigation, sensing)
- Battery levels (low battery robots should get fewer tasks)
- Location proximity (prefer robots already near the task location)
- Recent failures (avoid re-assigning failed tasks to the same robot)
- Task dependencies (respect ordering constraints)
- Load balancing (distribute tasks evenly)
Respond with a JSON object containing:
- "assignments": {robot_id: [task_ids]}
- "reasoning": brief explanation of allocation decisions
- "unassigned": [task_ids that couldn't be assigned, with reasons]"""
def state_to_context(state: STEMMemoryState, tasks: list[TaskRequest]) -> str:
"""Format STEM state and tasks as a user prompt."""
lines = ["## Team State\n"]
for rid, profile in sorted(state.robot_profiles.items()):
caps = ", ".join(c.category for c in profile.capabilities.values())
lines.append(
f"- **{rid}** ({profile.robot_type}): "
f"battery={profile.battery_pct:.0f}%, state={profile.current_state.value}, "
f"location={profile.location}, capabilities=[{caps}], "
f"speed={profile.max_speed_m_s:.1f}m/s"
)
if state.scenes:
lines.append("\n## Known Scenes\n")
for sg in state.scenes.values():
obj_names = ", ".join(sorted(sg.objects.keys()))
lines.append(f"- {sg.location_name}: objects=[{obj_names}]")
recent_failures = [e for e in state.task_history if not e.success]
if recent_failures:
lines.append("\n## Recent Failures\n")
for evt in recent_failures[-5:]:
lines.append(f"- {evt.robot_id} failed '{evt.task_name}' at {evt.target_location}")
lines.append("\n## Task Requests\n")
for task in tasks:
caps_str = ", ".join(task.required_capabilities) if task.required_capabilities else "any"
deps = f", depends_on=[{', '.join(task.dependency_ids)}]" if task.dependency_ids else ""
objs = f", objects=[{', '.join(task.target_objects)}]" if task.target_objects else ""
lines.append(
f"- **{task.task_id}**: \"{task.task_name}\" "
f"(caps=[{caps_str}], location={task.target_location}, "
f"priority={task.priority}{deps}{objs})"
)
lines.append("\nAssign each task to the best robot. Return JSON.")
return "\n".join(lines)
def allocation_to_response(
plan,
tasks: list[TaskRequest],
) -> str:
"""Format a TaskPlan as the expected assistant response."""
assignments = {}
for robot_id, task_assignments in plan.assignments.items():
assignments[robot_id] = [a.task_id for a in task_assignments]
unassigned = []
for task in plan.unassigned_tasks:
reason = plan.failure_reasons.get(task.task_id, "no suitable robot")
unassigned.append({"task_id": task.task_id, "reason": reason})
response = {
"assignments": assignments,
"reasoning": plan.reasoning,
"unassigned": unassigned,
}
return json.dumps(response, indent=2)
# ---------------------------------------------------------------------------
# Main generation loop
# ---------------------------------------------------------------------------
@dataclass
class DatasetStats:
total: int = 0
fully_assigned: int = 0
partial: int = 0
empty: int = 0
with_deps: int = 0
with_failures: int = 0
avg_robots: float = 0.0
avg_tasks: float = 0.0
async def generate_dataset(
n_examples: int = 5000,
output_path: str | None = None,
seed: int = 42,
) -> DatasetStats:
"""Generate the full training dataset."""
random.seed(seed)
if output_path is None:
output_path = "/mnt/artifacts-datai/logs/project_agora/planning_train.jsonl"
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
brain = Brain(BrainConfig(mllm_provider="heuristic"))
stats = DatasetStats()
total_robots = 0
total_tasks = 0
with open(output_path, "w") as f:
for i in range(n_examples):
n_robots = random.randint(2, 6)
n_tasks = random.randint(1, 8)
with_deps = random.random() > 0.4
include_offline = random.random() > 0.7
include_low_battery = random.random() > 0.6
include_history = random.random() > 0.2
state, tasks = build_scenario(
n_robots=n_robots,
n_tasks=n_tasks,
include_offline=include_offline,
include_low_battery=include_low_battery,
with_dependencies=with_deps,
include_history=include_history,
)
plan = await brain.plan_team_tasks(state, tasks)
user_prompt = state_to_context(state, tasks)
assistant_response = allocation_to_response(plan, tasks)
example = {
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_prompt},
{"role": "assistant", "content": assistant_response},
],
}
f.write(json.dumps(example) + "\n")
stats.total += 1
total_robots += n_robots
total_tasks += n_tasks
if not plan.unassigned_tasks:
stats.fully_assigned += 1
elif plan.assignments:
stats.partial += 1
else:
stats.empty += 1
if with_deps:
stats.with_deps += 1
if any(not e.success for e in state.task_history):
stats.with_failures += 1
if (i + 1) % 500 == 0:
print(f" Generated {i + 1}/{n_examples} examples...")
stats.avg_robots = total_robots / max(n_examples, 1)
stats.avg_tasks = total_tasks / max(n_examples, 1)
# Also save a small eval split
eval_path = output_path.replace("_train.jsonl", "_eval.jsonl")
random.seed(seed + 1)
with open(eval_path, "w") as f:
for _ in range(200):
n_robots = random.randint(2, 6)
n_tasks = random.randint(2, 6)
state, tasks = build_scenario(
n_robots=n_robots,
n_tasks=n_tasks,
with_dependencies=random.random() > 0.5,
include_offline=random.random() > 0.7,
include_low_battery=random.random() > 0.6,
)
plan = await brain.plan_team_tasks(state, tasks)
example = {
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_prompt},
{"role": "assistant", "content": allocation_to_response(plan, tasks)},
],
}
f.write(json.dumps(example) + "\n")
print(f"\nDataset saved to: {output_path}")
print(f"Eval split saved to: {eval_path}")
return stats
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Generate AGORA planning training data")
parser.add_argument("--n-examples", type=int, default=5000, help="Number of training examples")
parser.add_argument("--seed", type=int, default=42, help="Random seed")
parser.add_argument(
"--output",
default="/mnt/artifacts-datai/logs/project_agora/planning_train.jsonl",
help="Output JSONL path",
)
args = parser.parse_args()
stats = asyncio.run(generate_dataset(
n_examples=args.n_examples,
output_path=args.output,
seed=args.seed,
))
print("\n=== Dataset Statistics ===")
print(f"Total examples: {stats.total}")
print(f"Fully assigned: {stats.fully_assigned}")
print(f"Partial: {stats.partial}")
print(f"Empty (no robots): {stats.empty}")
print(f"With dependencies: {stats.with_deps}")
print(f"With failures: {stats.with_failures}")
print(f"Avg robots/scene: {stats.avg_robots:.1f}")
print(f"Avg tasks/scene: {stats.avg_tasks:.1f}")