#!/usr/bin/env python3 """ LLM Agent Factory v2 - Simplified Generates AgentSpec definitions by iterating through domains with parallel processing. For each domain: process each role separately, then refine the whole domain. """ import argparse import asyncio import json # Configuration import os import re import traceback as tb from datetime import datetime from pathlib import Path from langchain_core.messages import HumanMessage, SystemMessage from langchain_openai import ChatOpenAI from rich.console import Console from rich.panel import Panel from rich.progress import ( BarColumn, Progress, SpinnerColumn, TaskProgressColumn, TextColumn, TimeElapsedColumn, TimeRemainingColumn, ) from rich.table import Table DEFAULT_API_KEY = os.getenv("LLM_API_KEY", "") DEFAULT_BASE_URL = os.getenv("LLM_BASE_URL", "https://api.openai.com/v1") DEFAULT_MODEL = os.getenv("LLM_MODEL", "gpt-4") MAX_AGENTS_PER_COMBO = 3 MAX_RETRIES = 3 TEMPERATURE = 0.7 REQUEST_DELAY = 0.5 DEFAULT_TIMEOUT = 1800 # seconds per LLM request SCRIPT_DIR = Path(__file__).parent BASE_DIR = SCRIPT_DIR.parent CONFIG_DIR = BASE_DIR / "config" # Updated: agents are now stored in agents_database directory AGENTS_DIR = BASE_DIR / "agents_database" PROGRESS_FILE = SCRIPT_DIR / ".generation_progress.json" LLM_LOG_FILE = SCRIPT_DIR / "llm_debug.log" console = Console() def log_llm(msg: str): """Log LLM debug info to file.""" with open(LLM_LOG_FILE, "a", encoding="utf-8") as f: f.write(f"[{datetime.now().isoformat()}] {msg}\n") def load_catalogs(): """Load domain, role, and tool catalogs from JSON files.""" with open(CONFIG_DIR / "domain.json", encoding="utf-8") as f: domains = json.load(f).get("domain", []) with open(CONFIG_DIR / "role_id.json", encoding="utf-8") as f: roles = json.load(f).get("roles", []) with open(CONFIG_DIR / "tool.json", encoding="utf-8") as f: tools = json.load(f).get("tools", []) return domains, roles, tools # ═══════════════════════════════════════════════════════════════════════════════ # PROMPT BUILDING # ═══════════════════════════════════════════════════════════════════════════════ def build_combo_prompt( domain: str, role: str, all_tools: list[str], max_agents: int = MAX_AGENTS_PER_COMBO ) -> tuple[str, str]: """Build prompts for generating agents for ONE domain + ONE role combination.""" def build_combo_prompt( domain: str, role: str, all_tools: list[str], max_agents: int = MAX_AGENTS_PER_COMBO ) -> tuple[str, str]: """Build prompts for generating agents for ONE domain + ONE role combination.""" system_prompt = f"""You evaluate if a domain+role combination is logical and create agents ONLY if it makes real practical sense. ## OUTPUT FORMAT {{ "agents": [...], // array of agents OR empty array if combination is invalid "notes": "string" // explanation of your decision }} ## Agent Schema (when creating) {{ "agent_id": "string", // [a-z0-9_]+ slug "display_name": "string", // human name "persona": "string", // 2-3 sentences "description": "string", // what agent does "role_id": "{role}", // FIXED "domain": "{domain}", // FIXED "tools": [], // usually empty - see rules "input_schema": {{}}, // usually empty - see rules "output_schema": {{}}, // usually empty - see rules "raw": {{}} }} ## RULE 1: SOME COMBINATIONS ARE INVALID → 0 AGENTS Not all domain+role combinations make sense. Return empty array when combination is illogical. INVALID examples: - coder + Cooking → no code to write - debugger + Philosophy → nothing to debug - devops + Music → no infrastructure - architect + Sports → no systems to design - data_scientist + Folklore → no data to analyze Technical roles (coder, debugger, architect, devops, code_reviewer) are only valid for technical domains like Computing, Programming, Software, Engineering. ASK: "Would someone hire a {role} specifically for {domain}?" If NO → return {{"agents": [], "notes": "Invalid: "}} ## RULE 2: tools — use when agent needs external capabilities Available tools: - "web_search" — for agents that need current information from internet - "code_interpreter" — for agents that execute code or calculations - "file_search" — for agents that search through documents - "vector_search" — for semantic search in knowledge bases - "image_generation" — for agents that create images - "shell" — for agents that run system commands - "computer_use" — for agents that interact with computer UI - "apply_patch" — for agents that modify code files - "function_calling" — for agents that call external APIs - "remote_mcp_servers" — for connecting to external tool servers Most agents work with tools: [] because they process provided information. Add tools only when the agent's job requires external actions or data: EXAMPLES when to use tools: - Research assistant needing latest news → "web_search" - Code debugger that runs code → "code_interpreter" - File analyzer searching documents → "file_search" - API integrator calling services → "function_calling" EXAMPLES when NOT to use tools: - Writing assistant explaining concepts → tools: [] - Advisor giving recommendations → tools: [] - Teacher explaining topics → tools: [] ## RULE 3: input_schema = {{}} and output_schema = {{}} unless agent returns pure data Schemas are only for agents that return structured data with NO explanatory text. NEVER add schemas for agents that explain, advise, write, analyze, teach — their output is prose. ONLY add output_schema for agents returning pure data: - Calculator returning only numbers - Classifier returning only category - Extractor returning only JSON fields If agent writes any explanatory text → schemas: {{}} ## RULE 4: Maximum {max_agents} agents, prefer 0 or 1 or more Only create multiple agents if they serve completely different purposes. Return ONLY valid JSON.""" user_prompt = f"""Domain: "{domain}" Role: "{role}" Is this combination logical? Would someone hire a "{role}" for "{domain}"? If NO → return {{"agents": [], "notes": "Invalid because..."}} If YES → create 1 agent. Use tools: [] unless the agent needs external capabilities for its job.""" return system_prompt, user_prompt def create_llm_client( api_key: str = DEFAULT_API_KEY, base_url: str = DEFAULT_BASE_URL, model: str = DEFAULT_MODEL, ) -> ChatOpenAI: """Create LLM client.""" return ChatOpenAI(api_key=api_key, base_url=base_url, model=model, temperature=TEMPERATURE) async def call_llm(llm: ChatOpenAI, system_prompt: str, user_prompt: str) -> dict: """Call LLM and parse JSON response with retries.""" messages = [SystemMessage(content=system_prompt), HumanMessage(content=user_prompt)] req_id = f"{datetime.now().timestamp():.0f}"[-6:] # short request id for attempt in range(MAX_RETRIES + 1): start = datetime.now() try: log_llm(f"[{req_id}] REQUEST attempt={attempt} prompt={user_prompt[:80]!r}") response = await asyncio.wait_for(llm.ainvoke(messages), timeout=DEFAULT_TIMEOUT) elapsed = (datetime.now() - start).total_seconds() log_llm(f"[{req_id}] RESPONSE elapsed={elapsed:.2f}s len={len(response.content)}") log_llm(f"[{req_id}] CONTENT: {response.content}") json_text = extract_json_from_response(response.content) data = json.loads(json_text) if isinstance(data, dict): return data except Exception as e: elapsed = (datetime.now() - start).total_seconds() log_llm(f"[{req_id}] ERROR attempt={attempt} elapsed={elapsed:.2f}s type={type(e).__name__} error={e}") log_llm(f"[{req_id}] TRACEBACK: {tb.format_exc()}") if attempt < MAX_RETRIES: await asyncio.sleep(1) return {"agents": []} # Return valid empty structure after all retries def extract_json_from_response(response: str) -> str: """Extract JSON from response.""" text = response.strip() if text.startswith("```"): text = text[text.find("\n") + 1 :] text = text.removesuffix("```") text = text.strip() start = text.find("{") end = text.rfind("}") return text[start : end + 1] if start != -1 and end != -1 else text def validate_agent(agent: dict, domain: str, role: str, tools: list[str]) -> dict | None: """Validate and normalize a single agent.""" if not isinstance(agent, dict) or "agent_id" not in agent: return None agent_id = agent["agent_id"] if not re.match(r"^[a-z0-9_]+$", agent_id): return None # Normalize agent agent["domain"] = domain agent["role_id"] = role agent["tools"] = [t for t in agent.get("tools", []) if t in tools] agent.setdefault("input_schema", {}) agent.setdefault("output_schema", {}) agent.setdefault("raw", {}) return agent def load_progress() -> set: """Load progress data.""" if PROGRESS_FILE.exists(): data = json.load(open(PROGRESS_FILE, encoding="utf-8")) return set(data.get("completed_domains", [])) return set() def save_progress(completed_domains: set): """Save progress.""" json.dump( {"completed_domains": list(completed_domains)}, open(PROGRESS_FILE, "w", encoding="utf-8"), ensure_ascii=False, indent=2, ) def clear_progress(): """Clear progress file.""" if PROGRESS_FILE.exists(): PROGRESS_FILE.unlink() # ═══════════════════════════════════════════════════════════════════════════════ # OUTPUT # ═══════════════════════════════════════════════════════════════════════════════ def get_domain_file_path(domain: str) -> Path: """Get the file path for a domain's agents.""" filename = re.sub(r"[^\w\-]", "_", domain) + ".json" return AGENTS_DIR / filename def save_domain_agents(domain: str, agents: list[dict]) -> Path: """Save agents for a domain to its own JSON file.""" AGENTS_DIR.mkdir(exist_ok=True) output_path = get_domain_file_path(domain) result = { "domain": domain, "generated_at": datetime.now().isoformat(), "total_agents": len(agents), "agents": agents, } with open(output_path, "w", encoding="utf-8") as f: json.dump(result, f, ensure_ascii=False, indent=2) return output_path # ═══════════════════════════════════════════════════════════════════════════════ # COMBO PROCESSING (domain + role) # ═══════════════════════════════════════════════════════════════════════════════ async def process_combo(domain: str, role: str, tools: list[str], llm: ChatOpenAI) -> list[dict]: """Process domain+role combination.""" system_prompt, user_prompt = build_combo_prompt(domain, role, tools) for attempt in range(MAX_RETRIES + 1): try: data = await call_llm(llm, system_prompt, user_prompt) if not isinstance(data.get("agents"), list): if attempt < MAX_RETRIES: await asyncio.sleep(0.1) # Faster retry continue return [] valid_agents = [] for agent in data["agents"]: if validated := validate_agent(agent, domain, role, tools): valid_agents.append(validated) return valid_agents[:MAX_AGENTS_PER_COMBO] except Exception as e: if attempt < MAX_RETRIES: console.print(f"[yellow]Retry {attempt + 1}/{MAX_RETRIES} {domain}+{role}: {e}[/yellow]") await asyncio.sleep(0.5) else: console.print(f"[red]Failed {domain}+{role} after {MAX_RETRIES + 1} attempts: {e}[/red]") return [] return [] # ═══════════════════════════════════════════════════════════════════════════════ # DOMAIN PROCESSING # ═══════════════════════════════════════════════════════════════════════════════ async def process_domain(domain: str, roles: list[str], tools: list[str], llm: ChatOpenAI) -> list[dict]: """Process all roles for a domain and save to file.""" domain_agents = [] # Process each role for role in roles: console.print(f" [{domain}] Processing {role}...") agents = await process_combo(domain, role, tools, llm) domain_agents.extend(agents) await asyncio.sleep(REQUEST_DELAY) # Save domain agents to file output_path = save_domain_agents(domain, domain_agents) console.print(f"[bold green] [{domain}] Complete: {len(domain_agents)} agents → {output_path.name}[/bold green]") return domain_agents # ═══════════════════════════════════════════════════════════════════════════════ # MAIN GENERATION # ═══════════════════════════════════════════════════════════════════════════════ async def generate_all_agents_async( api_key: str = DEFAULT_API_KEY, base_url: str = DEFAULT_BASE_URL, model: str = DEFAULT_MODEL, resume: bool = True, parallel: int = 1, ) -> int: """Main async function to generate agents for all domains.""" console.print(Panel.fit("[bold cyan]LLM Agent Factory v2[/bold cyan]", border_style="cyan")) # Load catalogs domains, roles, tools = load_catalogs() total_combos = len(domains) * len(roles) # Show summary table = Table(title="Configuration", show_header=False) table.add_column("Parameter", style="cyan") table.add_column("Value", style="green") table.add_row("Domains", str(len(domains))) table.add_row("Roles", str(len(roles))) table.add_row("Total Combinations", f"[bold]{total_combos}[/bold]") table.add_row("Tools", str(len(tools))) table.add_row("Output Dir", str(AGENTS_DIR)) table.add_row("Model", model) table.add_row("Parallel", str(parallel)) table.add_row("Timeout", f"{DEFAULT_TIMEOUT}s") console.print(table) console.print() # Load progress if resume: completed_domains = load_progress() if completed_domains: console.print(f"[green]Resuming: {len(completed_domains)}/{len(domains)} domains completed[/green]\n") else: clear_progress() completed_domains = set() llm = create_llm_client(api_key, base_url, model) # Domain is pending if not completed OR file is missing def is_domain_pending(domain: str) -> bool: if domain not in completed_domains: return True return not get_domain_file_path(domain).exists() pending_domains = [d for d in domains if is_domain_pending(d)] if not pending_domains: console.print("[green]All domains already processed![/green]") return len(completed_domains) total_agents = 0 semaphore = asyncio.Semaphore(parallel) lock = asyncio.Lock() async def process_domain_with_semaphore(domain: str, progress_task) -> int: nonlocal total_agents, completed_domains async with semaphore: try: # Retry domain if 0 agents (likely a bug) for domain_attempt in range(MAX_RETRIES + 1): agents = await process_domain(domain, roles, tools, llm) if len(agents) > 0: break if domain_attempt < MAX_RETRIES: console.print( f"[yellow] [{domain}] 0 agents generated, retrying ({domain_attempt + 1}/{MAX_RETRIES})...[/yellow]" ) await asyncio.sleep(1) else: console.print(f"[red] [{domain}] 0 agents after {MAX_RETRIES + 1} attempts, skipping[/red]") async with lock: total_agents += len(agents) completed_domains.add(domain) progress.update( progress_task, advance=1, description=f"[green]Total: {total_agents}[/green]", ) save_progress(completed_domains) return len(agents) except Exception as e: console.print(f"[red] [{domain}] Error: {e}[/red]") return 0 # Process with progress bar with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(bar_width=40), TaskProgressColumn(), TextColumn("•"), TimeElapsedColumn(), TextColumn("•"), TimeRemainingColumn(), console=console, refresh_per_second=2, ) as progress: task = progress.add_task( "[cyan]Processing domains...", total=len(domains), completed=len(completed_domains), ) try: await asyncio.gather(*[process_domain_with_semaphore(domain, task) for domain in pending_domains]) except KeyboardInterrupt: console.print("\n\n[yellow]Interrupted! Saving progress...[/yellow]") save_progress(completed_domains) console.print("[green]Progress saved. Run again to resume.[/green]") raise # Final stats stats_table = Table(title="Generation Complete", show_header=False) stats_table.add_column("Metric", style="cyan") stats_table.add_column("Value", style="green") stats_table.add_row("Domains Processed", str(len(completed_domains))) stats_table.add_row("Total Agents", f"[bold]{total_agents}[/bold]") stats_table.add_row("Output Directory", str(AGENTS_DIR)) console.print(stats_table) if len(completed_domains) == len(domains): clear_progress() console.print("\n[green]All domains processed successfully![/green]") return total_agents def generate_all_agents( api_key: str = DEFAULT_API_KEY, base_url: str = DEFAULT_BASE_URL, model: str = DEFAULT_MODEL, resume: bool = True, parallel: int = 1, ) -> int: """Synchronous wrapper.""" return asyncio.run(generate_all_agents_async(api_key, base_url, model, resume, parallel)) # ═══════════════════════════════════════════════════════════════════════════════ # CLI INTERFACE # ═══════════════════════════════════════════════════════════════════════════════ def main(): parser = argparse.ArgumentParser(description="Generate LLM Agents for all domain+role combinations.") parser.add_argument("--api-key", default=DEFAULT_API_KEY, help="LLM API key") parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help="LLM API base URL") parser.add_argument("--model", default=DEFAULT_MODEL, help="LLM model identifier") parser.add_argument( "--fresh", action="store_true", help="Start fresh, ignoring any previous progress", ) parser.add_argument( "-p", "--parallel", type=int, default=1, help="Number of domains to process in parallel (default: 1)", ) args = parser.parse_args() try: generate_all_agents(args.api_key, args.base_url, args.model, not args.fresh, args.parallel) except KeyboardInterrupt: console.print("\n[yellow]Exiting...[/yellow]") except Exception as e: console.print(f"\n[red]Fatal error: {e}[/red]") raise SystemExit(1) if __name__ == "__main__": main()