| """Task tool for delegating work to subagents.""" |
|
|
| import logging |
| import time |
| import uuid |
| from dataclasses import replace |
| from typing import Annotated, Literal |
|
|
| from langchain.tools import InjectedToolCallId, ToolRuntime, tool |
| from langgraph.config import get_stream_writer |
| from langgraph.typing import ContextT |
|
|
| from src.agents.lead_agent.prompt import get_skills_prompt_section |
| from src.agents.thread_state import ThreadState |
| from src.subagents import SubagentExecutor, get_subagent_config |
| from src.subagents.executor import SubagentStatus, get_background_task_result |
| from src.utils.runtime import get_runtime_thread_id |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| @tool("task", parse_docstring=True) |
| def task_tool( |
| runtime: ToolRuntime[ContextT, ThreadState], |
| description: str, |
| prompt: str, |
| subagent_type: Literal["general-purpose", "bash"], |
| tool_call_id: Annotated[str, InjectedToolCallId], |
| max_turns: int | None = None, |
| ) -> str: |
| """Delegate a task to a specialized subagent that runs in its own context. |
| |
| Subagents help you: |
| - Preserve context by keeping exploration and implementation separate |
| - Handle complex multi-step tasks autonomously |
| - Execute commands or operations in isolated contexts |
| |
| Available subagent types: |
| - **general-purpose**: A capable agent for complex, multi-step tasks that require |
| both exploration and action. Use when the task requires complex reasoning, |
| multiple dependent steps, or would benefit from isolated context. |
| - **bash**: Command execution specialist for running bash commands. Use for |
| git operations, build processes, or when command output would be verbose. |
| |
| When to use this tool: |
| - Complex tasks requiring multiple steps or tools |
| - Tasks that produce verbose output |
| - When you want to isolate context from the main conversation |
| - Parallel research or exploration tasks |
| |
| When NOT to use this tool: |
| - Simple, single-step operations (use tools directly) |
| - Tasks requiring user interaction or clarification |
| |
| Args: |
| description: A short (3-5 word) description of the task for logging/display. ALWAYS PROVIDE THIS PARAMETER FIRST. |
| prompt: The task description for the subagent. Be specific and clear about what needs to be done. ALWAYS PROVIDE THIS PARAMETER SECOND. |
| subagent_type: The type of subagent to use. ALWAYS PROVIDE THIS PARAMETER THIRD. |
| max_turns: Optional maximum number of agent turns. Defaults to subagent's configured max. |
| """ |
| |
| config = get_subagent_config(subagent_type) |
| if config is None: |
| return f"Error: Unknown subagent type '{subagent_type}'. Available: general-purpose, bash" |
|
|
| |
| overrides: dict = {} |
|
|
| skills_section = get_skills_prompt_section() |
| if skills_section: |
| overrides["system_prompt"] = config.system_prompt + "\n\n" + skills_section |
|
|
| if max_turns is not None: |
| overrides["max_turns"] = max_turns |
|
|
| if overrides: |
| config = replace(config, **overrides) |
|
|
| |
| sandbox_state = None |
| thread_data = None |
| thread_id = None |
| parent_model = None |
| trace_id = None |
|
|
| if runtime is not None: |
| runtime_state = runtime.state or {} |
| sandbox_state = runtime_state.get("sandbox") |
| thread_data = runtime_state.get("thread_data") |
| thread_id = get_runtime_thread_id(runtime) |
|
|
| |
| metadata = runtime.config.get("metadata", {}) if isinstance(runtime.config, dict) else {} |
| parent_model = metadata.get("model_name") |
|
|
| |
| trace_id = metadata.get("trace_id") or str(uuid.uuid4())[:8] |
|
|
| |
| |
| from src.tools import get_available_tools |
|
|
| |
| tools = get_available_tools(model_name=parent_model, subagent_enabled=False) |
|
|
| |
| executor = SubagentExecutor( |
| config=config, |
| tools=tools, |
| parent_model=parent_model, |
| sandbox_state=sandbox_state, |
| thread_data=thread_data, |
| thread_id=thread_id, |
| trace_id=trace_id, |
| ) |
|
|
| |
| |
| task_id = executor.execute_async(prompt, task_id=tool_call_id) |
|
|
| |
| poll_count = 0 |
| last_status = None |
| last_message_count = 0 |
| |
| max_poll_count = (config.timeout_seconds + 60) // 5 |
|
|
| logger.info(f"[trace={trace_id}] Started background task {task_id} (subagent={subagent_type}, timeout={config.timeout_seconds}s, polling_limit={max_poll_count} polls)") |
|
|
| writer = get_stream_writer() |
| |
| writer({"type": "task_started", "task_id": task_id, "description": description}) |
|
|
| while True: |
| result = get_background_task_result(task_id) |
|
|
| if result is None: |
| logger.error(f"[trace={trace_id}] Task {task_id} not found in background tasks") |
| writer({"type": "task_failed", "task_id": task_id, "error": "Task disappeared from background tasks"}) |
| return f"Error: Task {task_id} disappeared from background tasks" |
|
|
| |
| if result.status != last_status: |
| logger.info(f"[trace={trace_id}] Task {task_id} status: {result.status.value}") |
| last_status = result.status |
|
|
| |
| current_message_count = len(result.ai_messages) |
| if current_message_count > last_message_count: |
| |
| for i in range(last_message_count, current_message_count): |
| message = result.ai_messages[i] |
| writer( |
| { |
| "type": "task_running", |
| "task_id": task_id, |
| "message": message, |
| "message_index": i + 1, |
| "total_messages": current_message_count, |
| } |
| ) |
| logger.info(f"[trace={trace_id}] Task {task_id} sent message #{i + 1}/{current_message_count}") |
| last_message_count = current_message_count |
|
|
| |
| if result.status == SubagentStatus.COMPLETED: |
| writer({"type": "task_completed", "task_id": task_id, "result": result.result}) |
| logger.info(f"[trace={trace_id}] Task {task_id} completed after {poll_count} polls") |
| return f"Task Succeeded. Result: {result.result}" |
| elif result.status == SubagentStatus.FAILED: |
| writer({"type": "task_failed", "task_id": task_id, "error": result.error}) |
| logger.error(f"[trace={trace_id}] Task {task_id} failed: {result.error}") |
| return f"Task failed. Error: {result.error}" |
| elif result.status == SubagentStatus.TIMED_OUT: |
| writer({"type": "task_timed_out", "task_id": task_id, "error": result.error}) |
| logger.warning(f"[trace={trace_id}] Task {task_id} timed out: {result.error}") |
| return f"Task timed out. Error: {result.error}" |
|
|
| |
| time.sleep(5) |
| poll_count += 1 |
|
|
| |
| |
| |
| if poll_count > max_poll_count: |
| timeout_minutes = config.timeout_seconds // 60 |
| logger.error(f"[trace={trace_id}] Task {task_id} polling timed out after {poll_count} polls (should have been caught by thread pool timeout)") |
| writer({"type": "task_timed_out", "task_id": task_id}) |
| return f"Task polling timed out after {timeout_minutes} minutes. This may indicate the background task is stuck. Status: {result.status.value}" |
|
|