#!/usr/bin/env python3 """ Task Generator - Generate training tasks for each agent For each agent in the dataset, generates 4-5 unique tasks that this specific agent would handle best. Creates a synthetic dataset for training. Steps: 1. Merge all domain files from agents_norm into unified dataset files 2. For each agent, generate 4-5 tasks via LLM 3. Save results with progress tracking and resume capability """ import argparse import asyncio import json # ═══════════════════════════════════════════════════════════════════════════════ # CONFIGURATION # ═══════════════════════════════════════════════════════════════════════════════ import os import traceback as tb from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from langchain_core.messages import HumanMessage, SystemMessage from langchain_openai import ChatOpenAI from rich.console import Console from rich.panel import Panel from rich.progress import ( BarColumn, Progress, SpinnerColumn, TaskProgressColumn, TextColumn, TimeElapsedColumn, TimeRemainingColumn, ) from rich.table import Table DEFAULT_API_KEY = os.getenv("LLM_API_KEY", "") DEFAULT_BASE_URL = os.getenv("LLM_BASE_URL", "https://api.openai.com/v1") DEFAULT_MODEL = os.getenv("LLM_MODEL", "gpt-4") TEMPERATURE = 0.7 MAX_RETRIES = 3 DEFAULT_TIMEOUT = 300 DEFAULT_PARALLEL = 10 TASKS_PER_AGENT = 11 SCRIPT_DIR = Path(__file__).parent BASE_DIR = SCRIPT_DIR.parent DATASET_DIR = BASE_DIR / "dataset" AGENTS_NORM_DIR = DATASET_DIR / "agents_norm" OUTPUT_DIR = DATASET_DIR / "agent_tasks" PROGRESS_FILE = SCRIPT_DIR / ".task_generator_progress.json" LOG_FILE = SCRIPT_DIR / "task_generator.log" # Dataset folders to process DATASET_FOLDERS = ["agents_eng"] console = Console() # ═══════════════════════════════════════════════════════════════════════════════ # LOGGING # ═══════════════════════════════════════════════════════════════════════════════ def log(msg: str, level: str = "INFO"): """Log message to file.""" timestamp = datetime.now().isoformat() with open(LOG_FILE, "a", encoding="utf-8") as f: f.write(f"[{timestamp}] [{level}] {msg}\n") def log_llm(msg: str): """Log LLM-specific debug info to file.""" timestamp = datetime.now().isoformat() with open(LOG_FILE, "a", encoding="utf-8") as f: f.write(f"[{timestamp}] [LLM] {msg}\n") # ═══════════════════════════════════════════════════════════════════════════════ # DATA STRUCTURES # ═══════════════════════════════════════════════════════════════════════════════ @dataclass class AgentEntry: """Represents an agent with its source dataset.""" agent_id: str display_name: str persona: str description: str role_id: str domain: str tools: list[str] dataset: str # which dataset folder this came from def to_dict(self) -> dict: return { "agent_id": self.agent_id, "display_name": self.display_name, "persona": self.persona, "description": self.description, "role_id": self.role_id, "domain": self.domain, "tools": self.tools, "dataset": self.dataset, } @dataclass class AgentTasks: """Tasks generated for an agent.""" agent_id: str dataset: str tasks: list[str] = field(default_factory=list) def to_dict(self) -> dict: return {"agent_id": self.agent_id, "dataset": self.dataset, "tasks": self.tasks} # ═══════════════════════════════════════════════════════════════════════════════ # PROGRESS MANAGEMENT # ═══════════════════════════════════════════════════════════════════════════════ def load_progress() -> dict: """Load progress from checkpoint file.""" if PROGRESS_FILE.exists(): try: with open(PROGRESS_FILE, encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, OSError): pass return { "merged_datasets": [], # list of dataset folder names already merged "completed_agents": {}, # dataset -> list of agent_ids with generated tasks "stats": {"total_agents": 0, "tasks_generated": 0, "errors": 0}, } def save_progress(progress: dict): """Save progress to checkpoint file.""" with open(PROGRESS_FILE, "w", encoding="utf-8") as f: json.dump(progress, f, ensure_ascii=False, indent=2) def clear_progress(): """Clear progress file.""" if PROGRESS_FILE.exists(): PROGRESS_FILE.unlink() # ═══════════════════════════════════════════════════════════════════════════════ # DATASET MERGING # ═══════════════════════════════════════════════════════════════════════════════ def merge_dataset(folder_name: str) -> list[AgentEntry]: """Merge all domain files from a dataset folder into a list of agents.""" folder_path = AGENTS_NORM_DIR / folder_name if not folder_path.exists(): log(f"Folder '{folder_name}' not found", "WARN") return [] agents = [] json_files = sorted(folder_path.glob("*.json")) for json_file in json_files: try: with open(json_file, encoding="utf-8") as f: data = json.load(f) domain = data.get("domain", json_file.stem) for agent_data in data.get("agents", []): agent = AgentEntry( agent_id=agent_data.get("agent_id", ""), display_name=agent_data.get("display_name", ""), persona=agent_data.get("persona", ""), description=agent_data.get("description", ""), role_id=agent_data.get("role_id", ""), domain=domain, tools=agent_data.get("tools", []), dataset=folder_name, ) if agent.agent_id: agents.append(agent) except Exception as e: log(f"Error reading {json_file}: {e}", "ERROR") log(f"Merged {len(agents)} agents from {folder_name} ({len(json_files)} files)") return agents def save_merged_dataset(folder_name: str, agents: list[AgentEntry]) -> Path: """Save merged dataset to a JSON file.""" dataset_dir = OUTPUT_DIR / folder_name dataset_dir.mkdir(parents=True, exist_ok=True) output_path = dataset_dir / "dataset.json" data = { "dataset": folder_name, "merged_at": datetime.now().isoformat(), "total_agents": len(agents), "agents": [a.to_dict() for a in agents], } with open(output_path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) log(f"Saved merged dataset to {output_path}") return output_path def load_merged_dataset(folder_name: str) -> list[AgentEntry]: """Load merged dataset from file.""" dataset_path = OUTPUT_DIR / folder_name / "dataset.json" if not dataset_path.exists(): return [] with open(dataset_path, encoding="utf-8") as f: data = json.load(f) agents = [] for agent_data in data.get("agents", []): agent = AgentEntry( agent_id=agent_data.get("agent_id", ""), display_name=agent_data.get("display_name", ""), persona=agent_data.get("persona", ""), description=agent_data.get("description", ""), role_id=agent_data.get("role_id", ""), domain=agent_data.get("domain", ""), tools=agent_data.get("tools", []), dataset=agent_data.get("dataset", folder_name), ) if agent.agent_id: agents.append(agent) return agents # ═══════════════════════════════════════════════════════════════════════════════ # LLM UTILITIES # ═══════════════════════════════════════════════════════════════════════════════ def create_llm(api_key: str, base_url: str, model: str) -> ChatOpenAI: """Create LLM client.""" return ChatOpenAI( api_key=api_key, base_url=base_url, model=model, temperature=TEMPERATURE, max_tokens=4000, ) def extract_json(text: str) -> dict | None: """Extract JSON from LLM response.""" text = text.strip() # Try direct parse try: return json.loads(text) except json.JSONDecodeError: pass # Remove markdown code blocks if text.startswith("```"): text = text[text.find("\n") + 1 :] text = text.removesuffix("```") text = text.strip() # Try to find JSON object start = text.find("{") end = text.rfind("}") if start != -1 and end > start: try: return json.loads(text[start : end + 1]) except json.JSONDecodeError: pass return None async def call_llm(llm: ChatOpenAI, system_prompt: str, user_prompt: str, timeout: int) -> dict | None: """Call LLM and parse JSON response with retries and exponential backoff.""" messages = [SystemMessage(content=system_prompt), HumanMessage(content=user_prompt)] req_id = f"{datetime.now().timestamp():.0f}"[-6:] for attempt in range(MAX_RETRIES + 1): start = datetime.now() try: log_llm(f"[{req_id}] REQUEST attempt={attempt}/{MAX_RETRIES} prompt_len={len(user_prompt)}") response = await asyncio.wait_for(llm.ainvoke(messages), timeout=timeout) elapsed = (datetime.now() - start).total_seconds() log_llm(f"[{req_id}] RESPONSE elapsed={elapsed:.2f}s len={len(response.content)}") log_llm(f"[{req_id}] CONTENT: {response.content[:500]}...") result = extract_json(response.content) if result: log_llm(f"[{req_id}] PARSED OK: tasks={len(result.get('tasks', []))}") return result log_llm(f"[{req_id}] JSON parse failed, will retry") except TimeoutError: elapsed = (datetime.now() - start).total_seconds() log_llm(f"[{req_id}] TIMEOUT attempt={attempt} after {elapsed:.0f}s") except Exception as e: elapsed = (datetime.now() - start).total_seconds() error_type = type(e).__name__ error_msg = str(e)[:200] if "524" in str(e) or "timeout" in str(e).lower(): log_llm(f"[{req_id}] SERVER TIMEOUT (524) attempt={attempt} after {elapsed:.0f}s") elif "500" in str(e) or "502" in str(e) or "503" in str(e): log_llm(f"[{req_id}] SERVER ERROR attempt={attempt} after {elapsed:.0f}s: {error_msg}") else: log_llm(f"[{req_id}] ERROR attempt={attempt} elapsed={elapsed:.2f}s: {error_type}: {error_msg}") if attempt < MAX_RETRIES: wait_time = 2 ** (attempt + 1) log_llm(f"[{req_id}] Waiting {wait_time}s before retry...") await asyncio.sleep(wait_time) log_llm(f"[{req_id}] FAILED after {MAX_RETRIES + 1} attempts") return None # ═══════════════════════════════════════════════════════════════════════════════ # TASK GENERATION PROMPTS # ═══════════════════════════════════════════════════════════════════════════════ TASK_GENERATION_SYSTEM_PROMPT_EN = """You create realistic user requests for AI agent training data. Your goal: generate {num_tasks} diverse, realistic user messages that THIS SPECIFIC AGENT would handle better than any other agent. ## CRITICAL: WHY THIS AGENT? Each task MUST be one where THIS SPECIFIC AGENT excels over all others. Before writing each task, ask yourself: - "Would a general assistant handle this equally well?" → If YES, task is BAD - "Would a different specialist agent be better for this?" → If YES, task is BAD - "Does this task specifically need THIS agent's unique expertise?" → Must be YES Read the agent's full profile — name, persona, description, tools. The task should leverage THIS agent's specific knowledge, skills, and character. ## TASK TYPES — BE CREATIVE! Tasks are NOT just questions! They can be: - **Direct requests**: "Book me a hotel in Paris for next week", "Draw me a logo for my cafe" - **Emotional support**: "I'm feeling overwhelmed, can we talk?", "I failed my exam and don't know what to do" - **Action tasks**: "Schedule a meeting", "Find and compare prices", "Translate this document" - **Creative generation**: "Write a poem about...", "Design a workout plan", "Compose a melody" - **Complex problems with real data**: Full math problems with numbers, code debugging with actual code, legal cases with specifics - **Roleplay/simulation**: "Pretend you're interviewing me for a job", "Let's practice a sales pitch" For META-AGENTS (orchestrator, router, planner, etc.): - "Optimize this multi-agent communication graph" - "Which agents should handle this complex request?" - "Plan execution order for these 5 subtasks" - "The previous agent failed, decide on fallback strategy" ## DIVERSITY REQUIREMENTS ({num_tasks} tasks) 1-2: Simple beginner questions 3-4: Complex analysis or comparison 5-6: Action/execution requests (do something, not just explain) 7-8: Creative or emotional tasks 9-10: Real-world problems with specific details 11: **SUPER HARD** — a genuinely difficult problem that might stump even the LLM Examples: complex physics calculation with numbers, multi-step legal analysis, intricate code optimization, advanced mathematical proof ## REALISTIC USER VOICE Write as REAL users would — natural, messy, contextual: - "hey can you help me with..." (casual) - "I need this urgently for tomorrow..." (stressed) - "So I've been thinking about this for a while..." (conversational) - Include typos occasionally, incomplete thoughts, real emotions ## AVOID ❌ Generic: "Help me with music" ❌ Only questions — include ACTION requests ❌ Robotic: "Provide comprehensive analysis" ❌ Same type repeated ❌ Tasks any general assistant could do ## OUTPUT FORMAT Return ONLY valid JSON with exactly {num_tasks} tasks: {{ "tasks": [ "First user request...", "Second user request...", ... ] }} No markdown, no explanations outside JSON.""" TASK_GENERATION_SYSTEM_PROMPT_RU = """You create realistic user requests for AI agent training data. Goal: generate {num_tasks} diverse, realistic user messages that THIS SPECIFIC AGENT would handle better than any other agent. ## CRITICAL: WHY THIS AGENT? Each task MUST be one where THIS SPECIFIC AGENT excels over all others. Before writing each task, ask yourself: - "Would a general assistant handle this equally well?" → If YES, task is BAD - "Would a different specialist agent be better for this?" → If YES, task is BAD - "Does this task specifically need THIS agent's unique expertise?" → Must be YES Read the agent's full profile — name, persona, description, tools. The task should leverage THIS agent's specific knowledge, skills, and character. ## TASK TYPES — BE CREATIVE! Tasks are NOT just questions! They can be: - **Direct requests**: "Book me a hotel in Paris for next week", "Draw me a logo for my cafe" - **Emotional support**: "I'm feeling overwhelmed, can we talk?", "I failed my exam and don't know what to do" - **Action tasks**: "Schedule a meeting", "Find and compare prices", "Translate this document" - **Creative generation**: "Write a poem about...", "Design a workout plan", "Compose a melody" - **Complex problems with real data**: Full math problems with numbers, code debugging with actual code, legal cases with specifics - **Roleplay/simulation**: "Pretend you're interviewing me for a job", "Let's practice a sales pitch" For META-AGENTS (orchestrator, router, planner, etc.): - "Optimize this multi-agent communication graph" - "Which agents should handle this complex request?" - "Plan execution order for these 5 subtasks" - "The previous agent failed, decide on fallback strategy" ## DIVERSITY REQUIREMENTS ({num_tasks} tasks) 1-2: Simple beginner questions 3-4: Complex analysis or comparison 5-6: Action/execution requests (do something, not just explain) 7-8: Creative or emotional tasks 9-10: Real-world problems with specific details 11: **SUPER HARD** — a genuinely difficult problem that might stump even the LLM Examples: complex physics calculation with numbers, multi-step legal analysis, intricate code optimization, advanced mathematical proof ## REALISTIC USER VOICE Write as REAL users would — natural, messy, contextual: - "hey can you help me with..." (casual) - "I need this urgently for tomorrow..." (stressed) - "So I've been thinking about this for a while..." (conversational) - Include typos occasionally, incomplete thoughts, real emotions ## AVOID ❌ Generic: "Help me with music" ❌ Only questions — include ACTION requests ❌ Robotic: "Provide comprehensive analysis" ❌ Same type repeated ❌ Tasks any general assistant could do ## OUTPUT FORMAT Return ONLY valid JSON with exactly {num_tasks} tasks: {{ "tasks": [ "First user request...", "Second user request...", ... ] }} No markdown, no explanations outside JSON.""" def build_task_prompt(agent: AgentEntry, language: str) -> tuple[str, str]: """Build prompts for task generation.""" num_tasks = TASKS_PER_AGENT # Describe tools capability if agent.tools: tools_desc_en = f"🔧 Tools available: {', '.join(agent.tools)}" tools_desc_ru = f"🔧 Tools available: {', '.join(agent.tools)}" else: tools_desc_en = "🔧 No external tools (reasoning and knowledge only)" tools_desc_ru = "🔧 No external tools (reasoning and knowledge only)" if language == "ru": system_prompt = TASK_GENERATION_SYSTEM_PROMPT_RU.format(num_tasks=num_tasks) user_prompt = f"""# Agent: {agent.display_name} {agent.persona} {agent.description} {tools_desc_ru} --- Generate {num_tasks} tasks where THIS agent would OUTPERFORM any other agent. For each task ask: "Why would I choose THIS agent over 1000 other AI specialists?" If no clear answer — rewrite the task to be more specific to this agent's unique expertise. Tasks in RUSSIAN language. Varying complexity and type. Natural language. Return only JSON.""" else: system_prompt = TASK_GENERATION_SYSTEM_PROMPT_EN.format(num_tasks=num_tasks) user_prompt = f"""# Agent: {agent.display_name} {agent.persona} {agent.description} {tools_desc_en} --- Generate {num_tasks} tasks where THIS agent would OUTPERFORM any other agent. For each task ask: "Why would I choose THIS agent over 1000 other AI specialists?" If no clear answer — rewrite the task to be more specific to this agent's unique expertise. Tasks in ENGLISH. Varying complexity and type. Natural language. Return only JSON.""" return system_prompt, user_prompt # ═══════════════════════════════════════════════════════════════════════════════ # TASK GENERATION # ═══════════════════════════════════════════════════════════════════════════════ async def generate_tasks_for_agent( agent: AgentEntry, llm: ChatOpenAI, language: str, timeout: int ) -> AgentTasks | None: """Generate tasks for a single agent.""" system_prompt, user_prompt = build_task_prompt(agent, language) result = await call_llm(llm, system_prompt, user_prompt, timeout) if not result: log(f"Failed to generate tasks for {agent.agent_id}", "WARN") return None tasks = result.get("tasks", []) if not tasks: log(f"No tasks in response for {agent.agent_id}", "WARN") return None # Validate tasks are strings with reasonable length valid_tasks = [t for t in tasks if isinstance(t, str) and len(t) > 20] if len(valid_tasks) < 5: log(f"Too few valid tasks for {agent.agent_id}: {len(valid_tasks)}", "WARN") return None return AgentTasks( agent_id=agent.agent_id, dataset=agent.dataset, tasks=valid_tasks[:TASKS_PER_AGENT], ) def get_tasks_file_path(folder_name: str) -> Path: """Get path to the unified tasks file for a dataset.""" dataset_dir = OUTPUT_DIR / folder_name dataset_dir.mkdir(parents=True, exist_ok=True) return dataset_dir / "tasks.json" def load_existing_tasks(folder_name: str) -> dict[str, list[str]]: """Load existing tasks from the unified file.""" tasks_file = get_tasks_file_path(folder_name) if not tasks_file.exists(): return {} try: with open(tasks_file, encoding="utf-8") as f: data = json.load(f) return {item["agent_id"]: item["tasks"] for item in data.get("agents", [])} except (json.JSONDecodeError, OSError, KeyError): return {} def save_all_tasks(folder_name: str, all_tasks: dict[str, list[str]], agents: list[AgentEntry]): """Save all tasks to a single unified JSON file.""" OUTPUT_DIR.mkdir(parents=True, exist_ok=True) tasks_file = get_tasks_file_path(folder_name) # Build agents list with their tasks agents_with_tasks = [] for agent in agents: if agent.agent_id in all_tasks: agents_with_tasks.append( { "agent_id": agent.agent_id, "display_name": agent.display_name, "domain": agent.domain, "role_id": agent.role_id, "tasks": all_tasks[agent.agent_id], } ) data = { "dataset": folder_name, "generated_at": datetime.now().isoformat(), "total_agents": len(agents_with_tasks), "total_tasks": sum(len(a["tasks"]) for a in agents_with_tasks), "agents": agents_with_tasks, } with open(tasks_file, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) log(f"Saved {len(agents_with_tasks)} agents with tasks to {tasks_file}") def get_language_for_dataset(folder_name: str) -> str: """Determine language based on dataset folder name.""" return "en" # ═══════════════════════════════════════════════════════════════════════════════ # DATASET PROCESSING # ═══════════════════════════════════════════════════════════════════════════════ async def process_dataset(folder_name: str, llm: ChatOpenAI, progress: dict, parallel: int, timeout: int) -> dict: """Process all agents in a dataset: merge and generate tasks.""" language = get_language_for_dataset(folder_name) lang_label = "Russian" if language == "ru" else "English" # Step 1: Merge dataset if not already done if folder_name not in progress["merged_datasets"]: console.print(f"\n[bold cyan]📦 Merging {folder_name}...[/bold cyan]") agents = merge_dataset(folder_name) if not agents: console.print(f"[yellow]⚠ No agents found in {folder_name}, skipping[/yellow]") return progress["stats"] save_merged_dataset(folder_name, agents) progress["merged_datasets"].append(folder_name) save_progress(progress) console.print(f"[green]✓ Merged {len(agents)} agents from {folder_name}[/green]") else: console.print(f"\n[bold cyan]📦 Loading merged {folder_name}...[/bold cyan]") agents = load_merged_dataset(folder_name) console.print(f"[green]✓ Loaded {len(agents)} agents[/green]") if not agents: return progress["stats"] # Load existing tasks from unified file all_tasks = load_existing_tasks(folder_name) # Get completed agents for this dataset completed = set(progress["completed_agents"].get(folder_name, [])) pending_agents = [a for a in agents if a.agent_id not in completed] if not pending_agents: console.print(f"[green]✓ All {len(agents)} agents already processed[/green]") return progress["stats"] console.print(f"[cyan]🤖 Generating tasks for {len(pending_agents)} agents ({lang_label})...[/cyan]") stats = progress["stats"] semaphore = asyncio.Semaphore(parallel) lock = asyncio.Lock() processed_count = len(completed) total_count = len(agents) save_counter = 0 # Counter for periodic saves progress_bar = Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(bar_width=40), TaskProgressColumn(), TextColumn("•"), TimeElapsedColumn(), TextColumn("•"), TimeRemainingColumn(), console=console, refresh_per_second=2, ) async def process_agent(agent: AgentEntry, task_id) -> bool: nonlocal processed_count, save_counter async with semaphore: agent_tasks = await generate_tasks_for_agent(agent, llm, language, timeout) async with lock: if agent_tasks: all_tasks[agent.agent_id] = agent_tasks.tasks stats["tasks_generated"] += len(agent_tasks.tasks) log(f"Generated {len(agent_tasks.tasks)} tasks for {agent.agent_id}") else: stats["errors"] += 1 if folder_name not in progress["completed_agents"]: progress["completed_agents"][folder_name] = [] progress["completed_agents"][folder_name].append(agent.agent_id) processed_count += 1 save_counter += 1 stats["total_agents"] = processed_count progress_bar.update(task_id, completed=processed_count) # Save progress and tasks file every 50 agents if save_counter >= 50: save_progress(progress) save_all_tasks(folder_name, all_tasks, agents) save_counter = 0 else: save_progress(progress) return agent_tasks is not None try: with progress_bar: task_id = progress_bar.add_task( f"[cyan]{folder_name} ({lang_label})[/cyan]", total=total_count, completed=len(completed), ) await asyncio.gather(*[process_agent(a, task_id) for a in pending_agents]) except KeyboardInterrupt: console.print("\n[yellow]⚠ Interrupted! Saving progress...[/yellow]") save_all_tasks(folder_name, all_tasks, agents) raise # Final save of all tasks save_all_tasks(folder_name, all_tasks, agents) console.print( f"[bold green]✓ Dataset '{folder_name}' complete: {processed_count}/{total_count} agents[/bold green]" ) console.print(f"[green] → Saved to {get_tasks_file_path(folder_name)}[/green]") log(f"Dataset '{folder_name}' complete: {processed_count}/{total_count} agents") return stats # ═══════════════════════════════════════════════════════════════════════════════ # MAIN # ═══════════════════════════════════════════════════════════════════════════════ async def generate_all_tasks_async( api_key: str = DEFAULT_API_KEY, base_url: str = DEFAULT_BASE_URL, model: str = DEFAULT_MODEL, parallel: int = DEFAULT_PARALLEL, resume: bool = True, folders: list[str] | None = None, timeout: int = DEFAULT_TIMEOUT, retries: int = MAX_RETRIES, ): """Main async function to generate tasks for all agents.""" global MAX_RETRIES MAX_RETRIES = retries console.print( Panel.fit( "[bold cyan]🎯 Task Generator[/bold cyan]\nGenerate training tasks for each agent", border_style="cyan", ) ) # Show config table = Table(title="Configuration", show_header=False) table.add_column("Parameter", style="cyan") table.add_column("Value", style="green") table.add_row("Input", str(AGENTS_NORM_DIR)) table.add_row("Output", str(OUTPUT_DIR)) table.add_row("Model", model) table.add_row("Parallel", str(parallel)) table.add_row("Timeout", f"{timeout}s") table.add_row("Retries", str(retries)) table.add_row("Tasks per agent", str(TASKS_PER_AGENT)) console.print(table) # Load or initialize progress if resume: progress = load_progress() total_completed = sum(len(v) for v in progress["completed_agents"].values()) if total_completed > 0: console.print(f"\n[green]✓ Resuming: {total_completed} agents already processed[/green]") else: clear_progress() progress = load_progress() # Create LLM client llm = create_llm(api_key, base_url, model) # Process datasets folders_to_process = folders if folders else DATASET_FOLDERS log("=" * 80) log(f"Starting task generation: folders={folders_to_process}, parallel={parallel}, model={model}") log("=" * 80) try: for folder_name in folders_to_process: await process_dataset(folder_name, llm, progress, parallel, timeout) except KeyboardInterrupt: console.print("\n[yellow]Saving progress and exiting...[/yellow]") save_progress(progress) console.print("[green]Progress saved. Run again to resume.[/green]") return # Final stats stats = progress["stats"] console.print() stats_table = Table(title="Task Generation Complete", show_header=False) stats_table.add_column("Metric", style="cyan") stats_table.add_column("Value", style="green") stats_table.add_row("Agents processed", str(stats["total_agents"])) stats_table.add_row("Tasks generated", f"[green]{stats['tasks_generated']}[/green]") stats_table.add_row("Errors", f"[red]{stats['errors']}[/red]" if stats["errors"] > 0 else "0") stats_table.add_row("Output", str(OUTPUT_DIR)) console.print(stats_table) # Check if all complete all_complete = True for folder in folders_to_process: agents = load_merged_dataset(folder) completed = set(progress["completed_agents"].get(folder, [])) if len(completed) < len(agents): all_complete = False break if all_complete: clear_progress() console.print("\n[bold green]✓ All datasets processed successfully![/bold green]") log("=" * 80) log( f"Task generation complete: agents={stats['total_agents']} tasks={stats['tasks_generated']} errors={stats['errors']}" ) log("=" * 80) def generate_all_tasks( api_key: str = DEFAULT_API_KEY, base_url: str = DEFAULT_BASE_URL, model: str = DEFAULT_MODEL, parallel: int = DEFAULT_PARALLEL, resume: bool = True, folders: list[str] | None = None, timeout: int = DEFAULT_TIMEOUT, retries: int = MAX_RETRIES, ): """Synchronous wrapper.""" asyncio.run(generate_all_tasks_async(api_key, base_url, model, parallel, resume, folders, timeout, retries)) # ═══════════════════════════════════════════════════════════════════════════════ # CLI # ═══════════════════════════════════════════════════════════════════════════════ def main(): parser = argparse.ArgumentParser( description="Generate training tasks for each agent in the dataset", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Run with default settings (resumes if interrupted) python task_generator.py # Run with 20 parallel requests python task_generator.py -p 20 # Start fresh (ignore previous progress) python task_generator.py --fresh # Process specific folders only python task_generator.py --folders agents_eng # Custom API settings python task_generator.py --base-url http://localhost:8080/v1 --model my-model # Adjust timeout and retries python task_generator.py --timeout 120 --retries 5 """, ) parser.add_argument("--api-key", type=str, default=DEFAULT_API_KEY, help="LLM API key") parser.add_argument("--base-url", type=str, default=DEFAULT_BASE_URL, help="LLM API base URL") parser.add_argument("--model", type=str, default=DEFAULT_MODEL, help="LLM model identifier") parser.add_argument( "-p", "--parallel", type=int, default=DEFAULT_PARALLEL, help=f"Number of parallel agent requests (default: {DEFAULT_PARALLEL})", ) parser.add_argument( "--fresh", action="store_true", help="Start fresh, ignoring any previous progress", ) parser.add_argument( "--folders", nargs="+", default=None, help=f"Specific dataset folders to process (default: {DATASET_FOLDERS})", ) parser.add_argument( "--timeout", type=int, default=DEFAULT_TIMEOUT, help=f"Timeout per LLM request in seconds (default: {DEFAULT_TIMEOUT})", ) parser.add_argument( "--retries", type=int, default=MAX_RETRIES, help=f"Number of retries on failure (default: {MAX_RETRIES})", ) args = parser.parse_args() try: generate_all_tasks( api_key=args.api_key, base_url=args.base_url, model=args.model, parallel=args.parallel, resume=not args.fresh, folders=args.folders, timeout=args.timeout, retries=args.retries, ) except KeyboardInterrupt: console.print("\n[yellow]Exiting...[/yellow]") except Exception as e: console.print(f"\n[red]Fatal error: {e}[/red]") log(f"Fatal error: {e}\n{tb.format_exc()}", "FATAL") raise SystemExit(1) if __name__ == "__main__": main()