| """ |
| Agent Trajectory Generator & Decision Point Labeling |
| |
| Generates synthetic agent trajectories with labeled decision points. |
| Also loads real trajectories from AgentBench / ToolBench. |
| """ |
|
|
| import json |
| import random |
| from pathlib import Path |
| from dataclasses import dataclass, field |
| from typing import List, Optional |
|
|
|
|
| @dataclass |
| class TrajectoryStep: |
| role: str |
| content: str |
| is_decision_point: bool = False |
| tokens: List[str] = field(default_factory=list) |
|
|
|
|
| @dataclass |
| class AgentTrajectory: |
| task: str |
| steps: List[TrajectoryStep] |
| total_tokens: int = 0 |
| decision_point_tokens: int = 0 |
|
|
| @property |
| def decision_ratio(self): |
| if self.total_tokens == 0: |
| return 0 |
| return self.decision_point_tokens / self.total_tokens |
|
|
|
|
| |
| DECISION_PATTERNS = { |
| "tool_call": [ |
| "I need to call", "Let me use", "Action:", "Tool:", "API call:", |
| "function_call", "execute(", "search(", "query(", |
| ], |
| "plan_revision": [ |
| "Let me reconsider", "Actually,", "Wait,", "On second thought", |
| "I should change", "New plan:", "Revised approach:", "Instead,", |
| ], |
| "error_recovery": [ |
| "Error:", "Failed:", "Exception:", "Traceback", "retry", |
| "That didn't work", "Let me try another", "fallback", |
| ], |
| "state_update": [ |
| "Result:", "Output:", "The answer is", "Found:", |
| "Updated:", "Status:", "Observation:", |
| ], |
| } |
|
|
| |
| ROUTINE_PATTERNS = [ |
| "Let me think about this...", |
| "Looking at the data...", |
| "Based on the context...", |
| "The document mentions...", |
| "According to the passage...", |
| "Step {i}: ", |
| "Processing...", |
| "Reading the input...", |
| ] |
|
|
|
|
| def generate_synthetic_trajectory( |
| num_steps=20, decision_ratio=0.15, task="multi-hop QA" |
| ) -> AgentTrajectory: |
| """Generate a synthetic agent trajectory with labeled decision points.""" |
| steps = [] |
| total_toks = 0 |
| dp_toks = 0 |
|
|
| for i in range(num_steps): |
| is_dp = random.random() < decision_ratio |
|
|
| if is_dp: |
| |
| dp_type = random.choice(list(DECISION_PATTERNS.keys())) |
| pattern = random.choice(DECISION_PATTERNS[dp_type]) |
|
|
| if dp_type == "tool_call": |
| content = f"{pattern} search_api('query about {task} step {i}')" |
| role = "action" |
| elif dp_type == "plan_revision": |
| content = f"{pattern} the approach for step {i} needs adjustment." |
| role = "thought" |
| elif dp_type == "error_recovery": |
| content = f"{pattern} step {i} encountered an issue. Trying alternative." |
| role = "error" |
| else: |
| content = f"{pattern} step {i} yielded new information for {task}." |
| role = "observation" |
| else: |
| pattern = random.choice(ROUTINE_PATTERNS).format(i=i) |
| content = f"{pattern} analyzing information related to {task}." |
| role = "thought" |
|
|
| tokens = content.split() |
| total_toks += len(tokens) |
| if is_dp: |
| dp_toks += len(tokens) |
|
|
| steps.append(TrajectoryStep( |
| role=role, content=content, |
| is_decision_point=is_dp, tokens=tokens, |
| )) |
|
|
| return AgentTrajectory( |
| task=task, steps=steps, |
| total_tokens=total_toks, decision_point_tokens=dp_toks, |
| ) |
|
|
|
|
| def generate_dataset(num_trajectories=1000, save_path=None): |
| """Generate a dataset of labeled agent trajectories.""" |
| tasks = [ |
| "multi-hop question answering", |
| "code debugging with tool use", |
| "web navigation and form filling", |
| "API orchestration pipeline", |
| "database query planning", |
| "research paper analysis", |
| ] |
|
|
| trajectories = [] |
| for i in range(num_trajectories): |
| task = random.choice(tasks) |
| num_steps = random.randint(10, 40) |
| ratio = random.uniform(0.08, 0.25) |
| traj = generate_synthetic_trajectory(num_steps, ratio, task) |
| trajectories.append(traj) |
|
|
| |
| ratios = [t.decision_ratio for t in trajectories] |
| avg_ratio = sum(ratios) / len(ratios) |
| print(f"Generated {len(trajectories)} trajectories") |
| print(f"Avg decision ratio: {avg_ratio:.2%}") |
| print(f"Min/Max ratio: {min(ratios):.2%} / {max(ratios):.2%}") |
| print(f"Avg steps: {sum(len(t.steps) for t in trajectories) / len(trajectories):.1f}") |
|
|
| if save_path: |
| save_path = Path(save_path) |
| save_path.parent.mkdir(parents=True, exist_ok=True) |
| data = [] |
| for t in trajectories: |
| data.append({ |
| "task": t.task, |
| "total_tokens": t.total_tokens, |
| "decision_point_tokens": t.decision_point_tokens, |
| "decision_ratio": t.decision_ratio, |
| "steps": [ |
| {"role": s.role, "content": s.content, |
| "is_decision_point": s.is_decision_point} |
| for s in t.steps |
| ], |
| }) |
| with open(save_path, "w") as f: |
| json.dump(data, f, indent=2) |
| print(f"Saved to {save_path}") |
|
|
| return trajectories |
|
|
|
|
| def label_decision_points(text: str) -> List[bool]: |
| """Label each token in text as decision point or not.""" |
| tokens = text.split() |
| labels = [] |
| for i, tok in enumerate(tokens): |
| is_dp = False |
| context = " ".join(tokens[max(0, i - 3):i + 3]) |
| for patterns in DECISION_PATTERNS.values(): |
| for p in patterns: |
| if p.lower() in context.lower(): |
| is_dp = True |
| break |
| if is_dp: |
| break |
| labels.append(is_dp) |
| return labels |
|
|
|
|
| if __name__ == "__main__": |
| |
| trajectories = generate_dataset( |
| num_trajectories=2000, |
| save_path="data/agent_trajectories.json", |
| ) |
|
|