| |
| """ |
| Task Generator - Generate training tasks for each agent |
| |
| For each agent in the dataset, generates 4-5 unique tasks that this specific |
| agent would handle best. Creates a synthetic dataset for training. |
| |
| Steps: |
| 1. Merge all domain files from agents_norm into unified dataset files |
| 2. For each agent, generate 4-5 tasks via LLM |
| 3. Save results with progress tracking and resume capability |
| """ |
|
|
| import argparse |
| import asyncio |
| import json |
|
|
| |
| |
| |
| import os |
| import traceback as tb |
| from dataclasses import dataclass, field |
| from datetime import datetime |
| from pathlib import Path |
|
|
| from langchain_core.messages import HumanMessage, SystemMessage |
| from langchain_openai import ChatOpenAI |
| from rich.console import Console |
| from rich.panel import Panel |
| from rich.progress import ( |
| BarColumn, |
| Progress, |
| SpinnerColumn, |
| TaskProgressColumn, |
| TextColumn, |
| TimeElapsedColumn, |
| TimeRemainingColumn, |
| ) |
| from rich.table import Table |
|
|
| DEFAULT_API_KEY = os.getenv("LLM_API_KEY", "") |
| DEFAULT_BASE_URL = os.getenv("LLM_BASE_URL", "https://api.openai.com/v1") |
| DEFAULT_MODEL = os.getenv("LLM_MODEL", "gpt-4") |
|
|
| TEMPERATURE = 0.7 |
| MAX_RETRIES = 3 |
| DEFAULT_TIMEOUT = 300 |
| DEFAULT_PARALLEL = 10 |
| TASKS_PER_AGENT = 11 |
|
|
| SCRIPT_DIR = Path(__file__).parent |
| BASE_DIR = SCRIPT_DIR.parent |
| DATASET_DIR = BASE_DIR / "dataset" |
| AGENTS_NORM_DIR = DATASET_DIR / "agents_norm" |
| OUTPUT_DIR = DATASET_DIR / "agent_tasks" |
| PROGRESS_FILE = SCRIPT_DIR / ".task_generator_progress.json" |
| LOG_FILE = SCRIPT_DIR / "task_generator.log" |
|
|
| |
| DATASET_FOLDERS = ["agents_eng"] |
|
|
| console = Console() |
|
|
|
|
| |
| |
| |
|
|
|
|
| def log(msg: str, level: str = "INFO"): |
| """Log message to file.""" |
| timestamp = datetime.now().isoformat() |
| with open(LOG_FILE, "a", encoding="utf-8") as f: |
| f.write(f"[{timestamp}] [{level}] {msg}\n") |
|
|
|
|
| def log_llm(msg: str): |
| """Log LLM-specific debug info to file.""" |
| timestamp = datetime.now().isoformat() |
| with open(LOG_FILE, "a", encoding="utf-8") as f: |
| f.write(f"[{timestamp}] [LLM] {msg}\n") |
|
|
|
|
| |
| |
| |
|
|
|
|
| @dataclass |
| class AgentEntry: |
| """Represents an agent with its source dataset.""" |
|
|
| agent_id: str |
| display_name: str |
| persona: str |
| description: str |
| role_id: str |
| domain: str |
| tools: list[str] |
| dataset: str |
|
|
| def to_dict(self) -> dict: |
| return { |
| "agent_id": self.agent_id, |
| "display_name": self.display_name, |
| "persona": self.persona, |
| "description": self.description, |
| "role_id": self.role_id, |
| "domain": self.domain, |
| "tools": self.tools, |
| "dataset": self.dataset, |
| } |
|
|
|
|
| @dataclass |
| class AgentTasks: |
| """Tasks generated for an agent.""" |
|
|
| agent_id: str |
| dataset: str |
| tasks: list[str] = field(default_factory=list) |
|
|
| def to_dict(self) -> dict: |
| return {"agent_id": self.agent_id, "dataset": self.dataset, "tasks": self.tasks} |
|
|
|
|
| |
| |
| |
|
|
|
|
| def load_progress() -> dict: |
| """Load progress from checkpoint file.""" |
| if PROGRESS_FILE.exists(): |
| try: |
| with open(PROGRESS_FILE, encoding="utf-8") as f: |
| return json.load(f) |
| except (json.JSONDecodeError, OSError): |
| pass |
| return { |
| "merged_datasets": [], |
| "completed_agents": {}, |
| "stats": {"total_agents": 0, "tasks_generated": 0, "errors": 0}, |
| } |
|
|
|
|
| def save_progress(progress: dict): |
| """Save progress to checkpoint file.""" |
| with open(PROGRESS_FILE, "w", encoding="utf-8") as f: |
| json.dump(progress, f, ensure_ascii=False, indent=2) |
|
|
|
|
| def clear_progress(): |
| """Clear progress file.""" |
| if PROGRESS_FILE.exists(): |
| PROGRESS_FILE.unlink() |
|
|
|
|
| |
| |
| |
|
|
|
|
| def merge_dataset(folder_name: str) -> list[AgentEntry]: |
| """Merge all domain files from a dataset folder into a list of agents.""" |
| folder_path = AGENTS_NORM_DIR / folder_name |
| if not folder_path.exists(): |
| log(f"Folder '{folder_name}' not found", "WARN") |
| return [] |
|
|
| agents = [] |
| json_files = sorted(folder_path.glob("*.json")) |
|
|
| for json_file in json_files: |
| try: |
| with open(json_file, encoding="utf-8") as f: |
| data = json.load(f) |
|
|
| domain = data.get("domain", json_file.stem) |
| for agent_data in data.get("agents", []): |
| agent = AgentEntry( |
| agent_id=agent_data.get("agent_id", ""), |
| display_name=agent_data.get("display_name", ""), |
| persona=agent_data.get("persona", ""), |
| description=agent_data.get("description", ""), |
| role_id=agent_data.get("role_id", ""), |
| domain=domain, |
| tools=agent_data.get("tools", []), |
| dataset=folder_name, |
| ) |
| if agent.agent_id: |
| agents.append(agent) |
| except Exception as e: |
| log(f"Error reading {json_file}: {e}", "ERROR") |
|
|
| log(f"Merged {len(agents)} agents from {folder_name} ({len(json_files)} files)") |
| return agents |
|
|
|
|
| def save_merged_dataset(folder_name: str, agents: list[AgentEntry]) -> Path: |
| """Save merged dataset to a JSON file.""" |
| dataset_dir = OUTPUT_DIR / folder_name |
| dataset_dir.mkdir(parents=True, exist_ok=True) |
| output_path = dataset_dir / "dataset.json" |
|
|
| data = { |
| "dataset": folder_name, |
| "merged_at": datetime.now().isoformat(), |
| "total_agents": len(agents), |
| "agents": [a.to_dict() for a in agents], |
| } |
|
|
| with open(output_path, "w", encoding="utf-8") as f: |
| json.dump(data, f, ensure_ascii=False, indent=2) |
|
|
| log(f"Saved merged dataset to {output_path}") |
| return output_path |
|
|
|
|
| def load_merged_dataset(folder_name: str) -> list[AgentEntry]: |
| """Load merged dataset from file.""" |
| dataset_path = OUTPUT_DIR / folder_name / "dataset.json" |
| if not dataset_path.exists(): |
| return [] |
|
|
| with open(dataset_path, encoding="utf-8") as f: |
| data = json.load(f) |
|
|
| agents = [] |
| for agent_data in data.get("agents", []): |
| agent = AgentEntry( |
| agent_id=agent_data.get("agent_id", ""), |
| display_name=agent_data.get("display_name", ""), |
| persona=agent_data.get("persona", ""), |
| description=agent_data.get("description", ""), |
| role_id=agent_data.get("role_id", ""), |
| domain=agent_data.get("domain", ""), |
| tools=agent_data.get("tools", []), |
| dataset=agent_data.get("dataset", folder_name), |
| ) |
| if agent.agent_id: |
| agents.append(agent) |
|
|
| return agents |
|
|
|
|
| |
| |
| |
|
|
|
|
| def create_llm(api_key: str, base_url: str, model: str) -> ChatOpenAI: |
| """Create LLM client.""" |
| return ChatOpenAI( |
| api_key=api_key, |
| base_url=base_url, |
| model=model, |
| temperature=TEMPERATURE, |
| max_tokens=4000, |
| ) |
|
|
|
|
| def extract_json(text: str) -> dict | None: |
| """Extract JSON from LLM response.""" |
| text = text.strip() |
|
|
| |
| try: |
| return json.loads(text) |
| except json.JSONDecodeError: |
| pass |
|
|
| |
| if text.startswith("```"): |
| text = text[text.find("\n") + 1 :] |
| text = text.removesuffix("```") |
| text = text.strip() |
|
|
| |
| start = text.find("{") |
| end = text.rfind("}") |
| if start != -1 and end > start: |
| try: |
| return json.loads(text[start : end + 1]) |
| except json.JSONDecodeError: |
| pass |
|
|
| return None |
|
|
|
|
| async def call_llm(llm: ChatOpenAI, system_prompt: str, user_prompt: str, timeout: int) -> dict | None: |
| """Call LLM and parse JSON response with retries and exponential backoff.""" |
| messages = [SystemMessage(content=system_prompt), HumanMessage(content=user_prompt)] |
| req_id = f"{datetime.now().timestamp():.0f}"[-6:] |
|
|
| for attempt in range(MAX_RETRIES + 1): |
| start = datetime.now() |
| try: |
| log_llm(f"[{req_id}] REQUEST attempt={attempt}/{MAX_RETRIES} prompt_len={len(user_prompt)}") |
| response = await asyncio.wait_for(llm.ainvoke(messages), timeout=timeout) |
| elapsed = (datetime.now() - start).total_seconds() |
| log_llm(f"[{req_id}] RESPONSE elapsed={elapsed:.2f}s len={len(response.content)}") |
| log_llm(f"[{req_id}] CONTENT: {response.content[:500]}...") |
|
|
| result = extract_json(response.content) |
| if result: |
| log_llm(f"[{req_id}] PARSED OK: tasks={len(result.get('tasks', []))}") |
| return result |
|
|
| log_llm(f"[{req_id}] JSON parse failed, will retry") |
|
|
| except TimeoutError: |
| elapsed = (datetime.now() - start).total_seconds() |
| log_llm(f"[{req_id}] TIMEOUT attempt={attempt} after {elapsed:.0f}s") |
| except Exception as e: |
| elapsed = (datetime.now() - start).total_seconds() |
| error_type = type(e).__name__ |
| error_msg = str(e)[:200] |
|
|
| if "524" in str(e) or "timeout" in str(e).lower(): |
| log_llm(f"[{req_id}] SERVER TIMEOUT (524) attempt={attempt} after {elapsed:.0f}s") |
| elif "500" in str(e) or "502" in str(e) or "503" in str(e): |
| log_llm(f"[{req_id}] SERVER ERROR attempt={attempt} after {elapsed:.0f}s: {error_msg}") |
| else: |
| log_llm(f"[{req_id}] ERROR attempt={attempt} elapsed={elapsed:.2f}s: {error_type}: {error_msg}") |
|
|
| if attempt < MAX_RETRIES: |
| wait_time = 2 ** (attempt + 1) |
| log_llm(f"[{req_id}] Waiting {wait_time}s before retry...") |
| await asyncio.sleep(wait_time) |
|
|
| log_llm(f"[{req_id}] FAILED after {MAX_RETRIES + 1} attempts") |
| return None |
|
|
|
|
| |
| |
| |
|
|
| TASK_GENERATION_SYSTEM_PROMPT_EN = """You create realistic user requests for AI agent training data. |
| |
| Your goal: generate {num_tasks} diverse, realistic user messages that THIS SPECIFIC AGENT would handle better than any other agent. |
| |
| ## CRITICAL: WHY THIS AGENT? |
| |
| Each task MUST be one where THIS SPECIFIC AGENT excels over all others. |
| |
| Before writing each task, ask yourself: |
| - "Would a general assistant handle this equally well?" → If YES, task is BAD |
| - "Would a different specialist agent be better for this?" → If YES, task is BAD |
| - "Does this task specifically need THIS agent's unique expertise?" → Must be YES |
| |
| Read the agent's full profile — name, persona, description, tools. |
| The task should leverage THIS agent's specific knowledge, skills, and character. |
| |
| ## TASK TYPES — BE CREATIVE! |
| |
| Tasks are NOT just questions! They can be: |
| - **Direct requests**: "Book me a hotel in Paris for next week", "Draw me a logo for my cafe" |
| - **Emotional support**: "I'm feeling overwhelmed, can we talk?", "I failed my exam and don't know what to do" |
| - **Action tasks**: "Schedule a meeting", "Find and compare prices", "Translate this document" |
| - **Creative generation**: "Write a poem about...", "Design a workout plan", "Compose a melody" |
| - **Complex problems with real data**: Full math problems with numbers, code debugging with actual code, legal cases with specifics |
| - **Roleplay/simulation**: "Pretend you're interviewing me for a job", "Let's practice a sales pitch" |
| |
| For META-AGENTS (orchestrator, router, planner, etc.): |
| - "Optimize this multi-agent communication graph" |
| - "Which agents should handle this complex request?" |
| - "Plan execution order for these 5 subtasks" |
| - "The previous agent failed, decide on fallback strategy" |
| |
| ## DIVERSITY REQUIREMENTS ({num_tasks} tasks) |
| |
| 1-2: Simple beginner questions |
| 3-4: Complex analysis or comparison |
| 5-6: Action/execution requests (do something, not just explain) |
| 7-8: Creative or emotional tasks |
| 9-10: Real-world problems with specific details |
| 11: **SUPER HARD** — a genuinely difficult problem that might stump even the LLM |
| Examples: complex physics calculation with numbers, multi-step legal analysis, |
| intricate code optimization, advanced mathematical proof |
| |
| ## REALISTIC USER VOICE |
| |
| Write as REAL users would — natural, messy, contextual: |
| - "hey can you help me with..." (casual) |
| - "I need this urgently for tomorrow..." (stressed) |
| - "So I've been thinking about this for a while..." (conversational) |
| - Include typos occasionally, incomplete thoughts, real emotions |
| |
| ## AVOID |
| |
| ❌ Generic: "Help me with music" |
| ❌ Only questions — include ACTION requests |
| ❌ Robotic: "Provide comprehensive analysis" |
| ❌ Same type repeated |
| ❌ Tasks any general assistant could do |
| |
| ## OUTPUT FORMAT |
| Return ONLY valid JSON with exactly {num_tasks} tasks: |
| {{ |
| "tasks": [ |
| "First user request...", |
| "Second user request...", |
| ... |
| ] |
| }} |
| |
| No markdown, no explanations outside JSON.""" |
|
|
|
|
| TASK_GENERATION_SYSTEM_PROMPT_RU = """You create realistic user requests for AI agent training data. |
| |
| Goal: generate {num_tasks} diverse, realistic user messages that THIS SPECIFIC AGENT would handle better than any other agent. |
| |
| ## CRITICAL: WHY THIS AGENT? |
| |
| Each task MUST be one where THIS SPECIFIC AGENT excels over all others. |
| |
| Before writing each task, ask yourself: |
| - "Would a general assistant handle this equally well?" → If YES, task is BAD |
| - "Would a different specialist agent be better for this?" → If YES, task is BAD |
| - "Does this task specifically need THIS agent's unique expertise?" → Must be YES |
| |
| Read the agent's full profile — name, persona, description, tools. |
| The task should leverage THIS agent's specific knowledge, skills, and character. |
| |
| ## TASK TYPES — BE CREATIVE! |
| |
| Tasks are NOT just questions! They can be: |
| - **Direct requests**: "Book me a hotel in Paris for next week", "Draw me a logo for my cafe" |
| - **Emotional support**: "I'm feeling overwhelmed, can we talk?", "I failed my exam and don't know what to do" |
| - **Action tasks**: "Schedule a meeting", "Find and compare prices", "Translate this document" |
| - **Creative generation**: "Write a poem about...", "Design a workout plan", "Compose a melody" |
| - **Complex problems with real data**: Full math problems with numbers, code debugging with actual code, legal cases with specifics |
| - **Roleplay/simulation**: "Pretend you're interviewing me for a job", "Let's practice a sales pitch" |
| |
| For META-AGENTS (orchestrator, router, planner, etc.): |
| - "Optimize this multi-agent communication graph" |
| - "Which agents should handle this complex request?" |
| - "Plan execution order for these 5 subtasks" |
| - "The previous agent failed, decide on fallback strategy" |
| |
| ## DIVERSITY REQUIREMENTS ({num_tasks} tasks) |
| |
| 1-2: Simple beginner questions |
| 3-4: Complex analysis or comparison |
| 5-6: Action/execution requests (do something, not just explain) |
| 7-8: Creative or emotional tasks |
| 9-10: Real-world problems with specific details |
| 11: **SUPER HARD** — a genuinely difficult problem that might stump even the LLM |
| Examples: complex physics calculation with numbers, multi-step legal analysis, |
| intricate code optimization, advanced mathematical proof |
| |
| ## REALISTIC USER VOICE |
| |
| Write as REAL users would — natural, messy, contextual: |
| - "hey can you help me with..." (casual) |
| - "I need this urgently for tomorrow..." (stressed) |
| - "So I've been thinking about this for a while..." (conversational) |
| - Include typos occasionally, incomplete thoughts, real emotions |
| |
| ## AVOID |
| |
| ❌ Generic: "Help me with music" |
| ❌ Only questions — include ACTION requests |
| ❌ Robotic: "Provide comprehensive analysis" |
| ❌ Same type repeated |
| ❌ Tasks any general assistant could do |
| |
| ## OUTPUT FORMAT |
| Return ONLY valid JSON with exactly {num_tasks} tasks: |
| {{ |
| "tasks": [ |
| "First user request...", |
| "Second user request...", |
| ... |
| ] |
| }} |
| |
| No markdown, no explanations outside JSON.""" |
|
|
|
|
| def build_task_prompt(agent: AgentEntry, language: str) -> tuple[str, str]: |
| """Build prompts for task generation.""" |
| num_tasks = TASKS_PER_AGENT |
|
|
| |
| if agent.tools: |
| tools_desc_en = f"🔧 Tools available: {', '.join(agent.tools)}" |
| tools_desc_ru = f"🔧 Tools available: {', '.join(agent.tools)}" |
| else: |
| tools_desc_en = "🔧 No external tools (reasoning and knowledge only)" |
| tools_desc_ru = "🔧 No external tools (reasoning and knowledge only)" |
|
|
| if language == "ru": |
| system_prompt = TASK_GENERATION_SYSTEM_PROMPT_RU.format(num_tasks=num_tasks) |
| user_prompt = f"""# Agent: {agent.display_name} |
| |
| {agent.persona} |
| |
| {agent.description} |
| |
| {tools_desc_ru} |
| |
| --- |
| |
| Generate {num_tasks} tasks where THIS agent would OUTPERFORM any other agent. |
| |
| For each task ask: "Why would I choose THIS agent over 1000 other AI specialists?" |
| If no clear answer — rewrite the task to be more specific to this agent's unique expertise. |
| |
| Tasks in RUSSIAN language. Varying complexity and type. Natural language. |
| Return only JSON.""" |
| else: |
| system_prompt = TASK_GENERATION_SYSTEM_PROMPT_EN.format(num_tasks=num_tasks) |
| user_prompt = f"""# Agent: {agent.display_name} |
| |
| {agent.persona} |
| |
| {agent.description} |
| |
| {tools_desc_en} |
| |
| --- |
| |
| Generate {num_tasks} tasks where THIS agent would OUTPERFORM any other agent. |
| |
| For each task ask: "Why would I choose THIS agent over 1000 other AI specialists?" |
| If no clear answer — rewrite the task to be more specific to this agent's unique expertise. |
| |
| Tasks in ENGLISH. Varying complexity and type. Natural language. |
| Return only JSON.""" |
|
|
| return system_prompt, user_prompt |
|
|
|
|
| |
| |
| |
|
|
|
|
| async def generate_tasks_for_agent( |
| agent: AgentEntry, llm: ChatOpenAI, language: str, timeout: int |
| ) -> AgentTasks | None: |
| """Generate tasks for a single agent.""" |
| system_prompt, user_prompt = build_task_prompt(agent, language) |
|
|
| result = await call_llm(llm, system_prompt, user_prompt, timeout) |
|
|
| if not result: |
| log(f"Failed to generate tasks for {agent.agent_id}", "WARN") |
| return None |
|
|
| tasks = result.get("tasks", []) |
| if not tasks: |
| log(f"No tasks in response for {agent.agent_id}", "WARN") |
| return None |
|
|
| |
| valid_tasks = [t for t in tasks if isinstance(t, str) and len(t) > 20] |
|
|
| if len(valid_tasks) < 5: |
| log(f"Too few valid tasks for {agent.agent_id}: {len(valid_tasks)}", "WARN") |
| return None |
|
|
| return AgentTasks( |
| agent_id=agent.agent_id, |
| dataset=agent.dataset, |
| tasks=valid_tasks[:TASKS_PER_AGENT], |
| ) |
|
|
|
|
| def get_tasks_file_path(folder_name: str) -> Path: |
| """Get path to the unified tasks file for a dataset.""" |
| dataset_dir = OUTPUT_DIR / folder_name |
| dataset_dir.mkdir(parents=True, exist_ok=True) |
| return dataset_dir / "tasks.json" |
|
|
|
|
| def load_existing_tasks(folder_name: str) -> dict[str, list[str]]: |
| """Load existing tasks from the unified file.""" |
| tasks_file = get_tasks_file_path(folder_name) |
| if not tasks_file.exists(): |
| return {} |
|
|
| try: |
| with open(tasks_file, encoding="utf-8") as f: |
| data = json.load(f) |
| return {item["agent_id"]: item["tasks"] for item in data.get("agents", [])} |
| except (json.JSONDecodeError, OSError, KeyError): |
| return {} |
|
|
|
|
| def save_all_tasks(folder_name: str, all_tasks: dict[str, list[str]], agents: list[AgentEntry]): |
| """Save all tasks to a single unified JSON file.""" |
| OUTPUT_DIR.mkdir(parents=True, exist_ok=True) |
| tasks_file = get_tasks_file_path(folder_name) |
|
|
| |
| agents_with_tasks = [] |
| for agent in agents: |
| if agent.agent_id in all_tasks: |
| agents_with_tasks.append( |
| { |
| "agent_id": agent.agent_id, |
| "display_name": agent.display_name, |
| "domain": agent.domain, |
| "role_id": agent.role_id, |
| "tasks": all_tasks[agent.agent_id], |
| } |
| ) |
|
|
| data = { |
| "dataset": folder_name, |
| "generated_at": datetime.now().isoformat(), |
| "total_agents": len(agents_with_tasks), |
| "total_tasks": sum(len(a["tasks"]) for a in agents_with_tasks), |
| "agents": agents_with_tasks, |
| } |
|
|
| with open(tasks_file, "w", encoding="utf-8") as f: |
| json.dump(data, f, ensure_ascii=False, indent=2) |
|
|
| log(f"Saved {len(agents_with_tasks)} agents with tasks to {tasks_file}") |
|
|
|
|
| def get_language_for_dataset(folder_name: str) -> str: |
| """Determine language based on dataset folder name.""" |
| return "en" |
|
|
|
|
| |
| |
| |
|
|
|
|
| async def process_dataset(folder_name: str, llm: ChatOpenAI, progress: dict, parallel: int, timeout: int) -> dict: |
| """Process all agents in a dataset: merge and generate tasks.""" |
| language = get_language_for_dataset(folder_name) |
| lang_label = "Russian" if language == "ru" else "English" |
|
|
| |
| if folder_name not in progress["merged_datasets"]: |
| console.print(f"\n[bold cyan]📦 Merging {folder_name}...[/bold cyan]") |
| agents = merge_dataset(folder_name) |
| if not agents: |
| console.print(f"[yellow]⚠ No agents found in {folder_name}, skipping[/yellow]") |
| return progress["stats"] |
|
|
| save_merged_dataset(folder_name, agents) |
| progress["merged_datasets"].append(folder_name) |
| save_progress(progress) |
| console.print(f"[green]✓ Merged {len(agents)} agents from {folder_name}[/green]") |
| else: |
| console.print(f"\n[bold cyan]📦 Loading merged {folder_name}...[/bold cyan]") |
| agents = load_merged_dataset(folder_name) |
| console.print(f"[green]✓ Loaded {len(agents)} agents[/green]") |
|
|
| if not agents: |
| return progress["stats"] |
|
|
| |
| all_tasks = load_existing_tasks(folder_name) |
|
|
| |
| completed = set(progress["completed_agents"].get(folder_name, [])) |
| pending_agents = [a for a in agents if a.agent_id not in completed] |
|
|
| if not pending_agents: |
| console.print(f"[green]✓ All {len(agents)} agents already processed[/green]") |
| return progress["stats"] |
|
|
| console.print(f"[cyan]🤖 Generating tasks for {len(pending_agents)} agents ({lang_label})...[/cyan]") |
|
|
| stats = progress["stats"] |
| semaphore = asyncio.Semaphore(parallel) |
| lock = asyncio.Lock() |
| processed_count = len(completed) |
| total_count = len(agents) |
| save_counter = 0 |
|
|
| progress_bar = Progress( |
| SpinnerColumn(), |
| TextColumn("[progress.description]{task.description}"), |
| BarColumn(bar_width=40), |
| TaskProgressColumn(), |
| TextColumn("•"), |
| TimeElapsedColumn(), |
| TextColumn("•"), |
| TimeRemainingColumn(), |
| console=console, |
| refresh_per_second=2, |
| ) |
|
|
| async def process_agent(agent: AgentEntry, task_id) -> bool: |
| nonlocal processed_count, save_counter |
| async with semaphore: |
| agent_tasks = await generate_tasks_for_agent(agent, llm, language, timeout) |
|
|
| async with lock: |
| if agent_tasks: |
| all_tasks[agent.agent_id] = agent_tasks.tasks |
| stats["tasks_generated"] += len(agent_tasks.tasks) |
| log(f"Generated {len(agent_tasks.tasks)} tasks for {agent.agent_id}") |
| else: |
| stats["errors"] += 1 |
|
|
| if folder_name not in progress["completed_agents"]: |
| progress["completed_agents"][folder_name] = [] |
| progress["completed_agents"][folder_name].append(agent.agent_id) |
|
|
| processed_count += 1 |
| save_counter += 1 |
| stats["total_agents"] = processed_count |
| progress_bar.update(task_id, completed=processed_count) |
|
|
| |
| if save_counter >= 50: |
| save_progress(progress) |
| save_all_tasks(folder_name, all_tasks, agents) |
| save_counter = 0 |
| else: |
| save_progress(progress) |
|
|
| return agent_tasks is not None |
|
|
| try: |
| with progress_bar: |
| task_id = progress_bar.add_task( |
| f"[cyan]{folder_name} ({lang_label})[/cyan]", |
| total=total_count, |
| completed=len(completed), |
| ) |
| await asyncio.gather(*[process_agent(a, task_id) for a in pending_agents]) |
| except KeyboardInterrupt: |
| console.print("\n[yellow]⚠ Interrupted! Saving progress...[/yellow]") |
| save_all_tasks(folder_name, all_tasks, agents) |
| raise |
|
|
| |
| save_all_tasks(folder_name, all_tasks, agents) |
|
|
| console.print( |
| f"[bold green]✓ Dataset '{folder_name}' complete: {processed_count}/{total_count} agents[/bold green]" |
| ) |
| console.print(f"[green] → Saved to {get_tasks_file_path(folder_name)}[/green]") |
| log(f"Dataset '{folder_name}' complete: {processed_count}/{total_count} agents") |
|
|
| return stats |
|
|
|
|
| |
| |
| |
|
|
|
|
| async def generate_all_tasks_async( |
| api_key: str = DEFAULT_API_KEY, |
| base_url: str = DEFAULT_BASE_URL, |
| model: str = DEFAULT_MODEL, |
| parallel: int = DEFAULT_PARALLEL, |
| resume: bool = True, |
| folders: list[str] | None = None, |
| timeout: int = DEFAULT_TIMEOUT, |
| retries: int = MAX_RETRIES, |
| ): |
| """Main async function to generate tasks for all agents.""" |
| global MAX_RETRIES |
| MAX_RETRIES = retries |
|
|
| console.print( |
| Panel.fit( |
| "[bold cyan]🎯 Task Generator[/bold cyan]\nGenerate training tasks for each agent", |
| border_style="cyan", |
| ) |
| ) |
|
|
| |
| table = Table(title="Configuration", show_header=False) |
| table.add_column("Parameter", style="cyan") |
| table.add_column("Value", style="green") |
| table.add_row("Input", str(AGENTS_NORM_DIR)) |
| table.add_row("Output", str(OUTPUT_DIR)) |
| table.add_row("Model", model) |
| table.add_row("Parallel", str(parallel)) |
| table.add_row("Timeout", f"{timeout}s") |
| table.add_row("Retries", str(retries)) |
| table.add_row("Tasks per agent", str(TASKS_PER_AGENT)) |
| console.print(table) |
|
|
| |
| if resume: |
| progress = load_progress() |
| total_completed = sum(len(v) for v in progress["completed_agents"].values()) |
| if total_completed > 0: |
| console.print(f"\n[green]✓ Resuming: {total_completed} agents already processed[/green]") |
| else: |
| clear_progress() |
| progress = load_progress() |
|
|
| |
| llm = create_llm(api_key, base_url, model) |
|
|
| |
| folders_to_process = folders if folders else DATASET_FOLDERS |
|
|
| log("=" * 80) |
| log(f"Starting task generation: folders={folders_to_process}, parallel={parallel}, model={model}") |
| log("=" * 80) |
|
|
| try: |
| for folder_name in folders_to_process: |
| await process_dataset(folder_name, llm, progress, parallel, timeout) |
| except KeyboardInterrupt: |
| console.print("\n[yellow]Saving progress and exiting...[/yellow]") |
| save_progress(progress) |
| console.print("[green]Progress saved. Run again to resume.[/green]") |
| return |
|
|
| |
| stats = progress["stats"] |
|
|
| console.print() |
| stats_table = Table(title="Task Generation Complete", show_header=False) |
| stats_table.add_column("Metric", style="cyan") |
| stats_table.add_column("Value", style="green") |
| stats_table.add_row("Agents processed", str(stats["total_agents"])) |
| stats_table.add_row("Tasks generated", f"[green]{stats['tasks_generated']}[/green]") |
| stats_table.add_row("Errors", f"[red]{stats['errors']}[/red]" if stats["errors"] > 0 else "0") |
| stats_table.add_row("Output", str(OUTPUT_DIR)) |
| console.print(stats_table) |
|
|
| |
| all_complete = True |
| for folder in folders_to_process: |
| agents = load_merged_dataset(folder) |
| completed = set(progress["completed_agents"].get(folder, [])) |
| if len(completed) < len(agents): |
| all_complete = False |
| break |
|
|
| if all_complete: |
| clear_progress() |
| console.print("\n[bold green]✓ All datasets processed successfully![/bold green]") |
|
|
| log("=" * 80) |
| log( |
| f"Task generation complete: agents={stats['total_agents']} tasks={stats['tasks_generated']} errors={stats['errors']}" |
| ) |
| log("=" * 80) |
|
|
|
|
| def generate_all_tasks( |
| api_key: str = DEFAULT_API_KEY, |
| base_url: str = DEFAULT_BASE_URL, |
| model: str = DEFAULT_MODEL, |
| parallel: int = DEFAULT_PARALLEL, |
| resume: bool = True, |
| folders: list[str] | None = None, |
| timeout: int = DEFAULT_TIMEOUT, |
| retries: int = MAX_RETRIES, |
| ): |
| """Synchronous wrapper.""" |
| asyncio.run(generate_all_tasks_async(api_key, base_url, model, parallel, resume, folders, timeout, retries)) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser( |
| description="Generate training tasks for each agent in the dataset", |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| epilog=""" |
| Examples: |
| # Run with default settings (resumes if interrupted) |
| python task_generator.py |
| |
| # Run with 20 parallel requests |
| python task_generator.py -p 20 |
| |
| # Start fresh (ignore previous progress) |
| python task_generator.py --fresh |
| |
| # Process specific folders only |
| python task_generator.py --folders agents_eng |
| |
| # Custom API settings |
| python task_generator.py --base-url http://localhost:8080/v1 --model my-model |
| |
| # Adjust timeout and retries |
| python task_generator.py --timeout 120 --retries 5 |
| """, |
| ) |
|
|
| parser.add_argument("--api-key", type=str, default=DEFAULT_API_KEY, help="LLM API key") |
|
|
| parser.add_argument("--base-url", type=str, default=DEFAULT_BASE_URL, help="LLM API base URL") |
|
|
| parser.add_argument("--model", type=str, default=DEFAULT_MODEL, help="LLM model identifier") |
|
|
| parser.add_argument( |
| "-p", |
| "--parallel", |
| type=int, |
| default=DEFAULT_PARALLEL, |
| help=f"Number of parallel agent requests (default: {DEFAULT_PARALLEL})", |
| ) |
|
|
| parser.add_argument( |
| "--fresh", |
| action="store_true", |
| help="Start fresh, ignoring any previous progress", |
| ) |
|
|
| parser.add_argument( |
| "--folders", |
| nargs="+", |
| default=None, |
| help=f"Specific dataset folders to process (default: {DATASET_FOLDERS})", |
| ) |
|
|
| parser.add_argument( |
| "--timeout", |
| type=int, |
| default=DEFAULT_TIMEOUT, |
| help=f"Timeout per LLM request in seconds (default: {DEFAULT_TIMEOUT})", |
| ) |
|
|
| parser.add_argument( |
| "--retries", |
| type=int, |
| default=MAX_RETRIES, |
| help=f"Number of retries on failure (default: {MAX_RETRIES})", |
| ) |
|
|
| args = parser.parse_args() |
|
|
| try: |
| generate_all_tasks( |
| api_key=args.api_key, |
| base_url=args.base_url, |
| model=args.model, |
| parallel=args.parallel, |
| resume=not args.fresh, |
| folders=args.folders, |
| timeout=args.timeout, |
| retries=args.retries, |
| ) |
| except KeyboardInterrupt: |
| console.print("\n[yellow]Exiting...[/yellow]") |
| except Exception as e: |
| console.print(f"\n[red]Fatal error: {e}[/red]") |
| log(f"Fatal error: {e}\n{tb.format_exc()}", "FATAL") |
| raise SystemExit(1) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|