| """Subagent execution engine.""" |
|
|
| import logging |
| import threading |
| import uuid |
| from concurrent.futures import Future, ThreadPoolExecutor |
| from concurrent.futures import TimeoutError as FuturesTimeoutError |
| from dataclasses import dataclass |
| from datetime import datetime |
| from enum import Enum |
| from typing import Any |
|
|
| from langchain.agents import create_agent |
| from langchain.tools import BaseTool |
| from langchain_core.messages import AIMessage, HumanMessage |
| from langchain_core.runnables import RunnableConfig |
|
|
| from src.agents.thread_state import SandboxState, ThreadDataState, ThreadState |
| from src.models import create_chat_model |
| from src.subagents.config import SubagentConfig |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class SubagentStatus(Enum): |
| """Status of a subagent execution.""" |
|
|
| PENDING = "pending" |
| RUNNING = "running" |
| COMPLETED = "completed" |
| FAILED = "failed" |
| TIMED_OUT = "timed_out" |
|
|
|
|
| @dataclass |
| class SubagentResult: |
| """Result of a subagent execution. |
| |
| Attributes: |
| task_id: Unique identifier for this execution. |
| trace_id: Trace ID for distributed tracing (links parent and subagent logs). |
| status: Current status of the execution. |
| result: The final result message (if completed). |
| error: Error message (if failed). |
| started_at: When execution started. |
| completed_at: When execution completed. |
| ai_messages: List of complete AI messages (as dicts) generated during execution. |
| """ |
|
|
| task_id: str |
| trace_id: str |
| status: SubagentStatus |
| result: str | None = None |
| error: str | None = None |
| started_at: datetime | None = None |
| completed_at: datetime | None = None |
| ai_messages: list[dict[str, Any]] | None = None |
|
|
| def __post_init__(self): |
| """Initialize mutable defaults.""" |
| if self.ai_messages is None: |
| self.ai_messages = [] |
|
|
|
|
| |
| _background_tasks: dict[str, SubagentResult] = {} |
| _background_tasks_lock = threading.Lock() |
|
|
| |
| _scheduler_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-scheduler-") |
|
|
| |
| |
| _execution_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-exec-") |
|
|
|
|
| def _filter_tools( |
| all_tools: list[BaseTool], |
| allowed: list[str] | None, |
| disallowed: list[str] | None, |
| ) -> list[BaseTool]: |
| """Filter tools based on subagent configuration. |
| |
| Args: |
| all_tools: List of all available tools. |
| allowed: Optional allowlist of tool names. If provided, only these tools are included. |
| disallowed: Optional denylist of tool names. These tools are always excluded. |
| |
| Returns: |
| Filtered list of tools. |
| """ |
| filtered = all_tools |
|
|
| |
| if allowed is not None: |
| allowed_set = set(allowed) |
| filtered = [t for t in filtered if t.name in allowed_set] |
|
|
| |
| if disallowed is not None: |
| disallowed_set = set(disallowed) |
| filtered = [t for t in filtered if t.name not in disallowed_set] |
|
|
| return filtered |
|
|
|
|
| def _get_model_name(config: SubagentConfig, parent_model: str | None) -> str | None: |
| """Resolve the model name for a subagent. |
| |
| Args: |
| config: Subagent configuration. |
| parent_model: The parent agent's model name. |
| |
| Returns: |
| Model name to use, or None to use default. |
| """ |
| if config.model == "inherit": |
| return parent_model |
| return config.model |
|
|
|
|
| class SubagentExecutor: |
| """Executor for running subagents.""" |
|
|
| def __init__( |
| self, |
| config: SubagentConfig, |
| tools: list[BaseTool], |
| parent_model: str | None = None, |
| sandbox_state: SandboxState | None = None, |
| thread_data: ThreadDataState | None = None, |
| thread_id: str | None = None, |
| trace_id: str | None = None, |
| ): |
| """Initialize the executor. |
| |
| Args: |
| config: Subagent configuration. |
| tools: List of all available tools (will be filtered). |
| parent_model: The parent agent's model name for inheritance. |
| sandbox_state: Sandbox state from parent agent. |
| thread_data: Thread data from parent agent. |
| thread_id: Thread ID for sandbox operations. |
| trace_id: Trace ID from parent for distributed tracing. |
| """ |
| self.config = config |
| self.parent_model = parent_model |
| self.sandbox_state = sandbox_state |
| self.thread_data = thread_data |
| self.thread_id = thread_id |
| |
| self.trace_id = trace_id or str(uuid.uuid4())[:8] |
|
|
| |
| self.tools = _filter_tools( |
| tools, |
| config.tools, |
| config.disallowed_tools, |
| ) |
|
|
| logger.info(f"[trace={self.trace_id}] SubagentExecutor initialized: {config.name} with {len(self.tools)} tools") |
|
|
| def _create_agent(self): |
| """Create the agent instance.""" |
| model_name = _get_model_name(self.config, self.parent_model) |
| model = create_chat_model(name=model_name, thinking_enabled=False) |
|
|
| |
| |
| from src.agents.middlewares.thread_data_middleware import ThreadDataMiddleware |
| from src.sandbox.middleware import SandboxMiddleware |
|
|
| middlewares = [ |
| ThreadDataMiddleware(lazy_init=True), |
| SandboxMiddleware(lazy_init=True), |
| ] |
|
|
| return create_agent( |
| model=model, |
| tools=self.tools, |
| middleware=middlewares, |
| system_prompt=self.config.system_prompt, |
| state_schema=ThreadState, |
| ) |
|
|
| def _build_initial_state(self, task: str) -> dict[str, Any]: |
| """Build the initial state for agent execution. |
| |
| Args: |
| task: The task description. |
| |
| Returns: |
| Initial state dictionary. |
| """ |
| state: dict[str, Any] = { |
| "messages": [HumanMessage(content=task)], |
| } |
|
|
| |
| if self.sandbox_state is not None: |
| state["sandbox"] = self.sandbox_state |
| if self.thread_data is not None: |
| state["thread_data"] = self.thread_data |
|
|
| return state |
|
|
| def execute(self, task: str, result_holder: SubagentResult | None = None) -> SubagentResult: |
| """Execute a task synchronously. |
| |
| Args: |
| task: The task description for the subagent. |
| result_holder: Optional pre-created result object to update during execution. |
| |
| Returns: |
| SubagentResult with the execution result. |
| """ |
| if result_holder is not None: |
| |
| result = result_holder |
| else: |
| |
| task_id = str(uuid.uuid4())[:8] |
| result = SubagentResult( |
| task_id=task_id, |
| trace_id=self.trace_id, |
| status=SubagentStatus.RUNNING, |
| started_at=datetime.now(), |
| ) |
|
|
| try: |
| agent = self._create_agent() |
| state = self._build_initial_state(task) |
|
|
| |
| run_config: RunnableConfig = { |
| "recursion_limit": self.config.max_turns, |
| } |
| context = {} |
| if self.thread_id: |
| run_config["configurable"] = {"thread_id": self.thread_id} |
| context["thread_id"] = self.thread_id |
|
|
| logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} starting execution with max_turns={self.config.max_turns}") |
|
|
| |
| |
| final_state = None |
| for chunk in agent.stream(state, config=run_config, context=context, stream_mode="values"): |
| final_state = chunk |
|
|
| |
| messages = chunk.get("messages", []) |
| if messages: |
| last_message = messages[-1] |
| |
| if isinstance(last_message, AIMessage): |
| |
| message_dict = last_message.model_dump() |
| |
| |
| message_id = message_dict.get("id") |
| is_duplicate = False |
| if message_id: |
| is_duplicate = any(msg.get("id") == message_id for msg in result.ai_messages) |
| else: |
| is_duplicate = message_dict in result.ai_messages |
|
|
| if not is_duplicate: |
| result.ai_messages.append(message_dict) |
| logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} captured AI message #{len(result.ai_messages)}") |
|
|
| logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} completed execution") |
|
|
| if final_state is None: |
| logger.warning(f"[trace={self.trace_id}] Subagent {self.config.name} no final state") |
| result.result = "No response generated" |
| else: |
| |
| messages = final_state.get("messages", []) |
| logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} final messages count: {len(messages)}") |
|
|
| |
| last_ai_message = None |
| for msg in reversed(messages): |
| if isinstance(msg, AIMessage): |
| last_ai_message = msg |
| break |
|
|
| if last_ai_message is not None: |
| content = last_ai_message.content |
| |
| if isinstance(content, str): |
| result.result = content |
| elif isinstance(content, list): |
| |
| text_parts = [] |
| for block in content: |
| if isinstance(block, str): |
| text_parts.append(block) |
| elif isinstance(block, dict) and "text" in block: |
| text_parts.append(block["text"]) |
| result.result = "\n".join(text_parts) if text_parts else "No text content in response" |
| else: |
| result.result = str(content) |
| elif messages: |
| |
| last_message = messages[-1] |
| logger.warning(f"[trace={self.trace_id}] Subagent {self.config.name} no AIMessage found, using last message: {type(last_message)}") |
| result.result = str(last_message.content) if hasattr(last_message, "content") else str(last_message) |
| else: |
| logger.warning(f"[trace={self.trace_id}] Subagent {self.config.name} no messages in final state") |
| result.result = "No response generated" |
|
|
| result.status = SubagentStatus.COMPLETED |
| result.completed_at = datetime.now() |
|
|
| except Exception as e: |
| logger.exception(f"[trace={self.trace_id}] Subagent {self.config.name} execution failed") |
| result.status = SubagentStatus.FAILED |
| result.error = str(e) |
| result.completed_at = datetime.now() |
|
|
| return result |
|
|
| def execute_async(self, task: str, task_id: str | None = None) -> str: |
| """Start a task execution in the background. |
| |
| Args: |
| task: The task description for the subagent. |
| task_id: Optional task ID to use. If not provided, a random UUID will be generated. |
| |
| Returns: |
| Task ID that can be used to check status later. |
| """ |
| |
| if task_id is None: |
| task_id = str(uuid.uuid4())[:8] |
|
|
| |
| result = SubagentResult( |
| task_id=task_id, |
| trace_id=self.trace_id, |
| status=SubagentStatus.PENDING, |
| ) |
|
|
| logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} starting async execution, task_id={task_id}, timeout={self.config.timeout_seconds}s") |
|
|
| with _background_tasks_lock: |
| _background_tasks[task_id] = result |
|
|
| |
| def run_task(): |
| with _background_tasks_lock: |
| _background_tasks[task_id].status = SubagentStatus.RUNNING |
| _background_tasks[task_id].started_at = datetime.now() |
| result_holder = _background_tasks[task_id] |
|
|
| try: |
| |
| |
| execution_future: Future = _execution_pool.submit(self.execute, task, result_holder) |
| try: |
| |
| exec_result = execution_future.result(timeout=self.config.timeout_seconds) |
| with _background_tasks_lock: |
| _background_tasks[task_id].status = exec_result.status |
| _background_tasks[task_id].result = exec_result.result |
| _background_tasks[task_id].error = exec_result.error |
| _background_tasks[task_id].completed_at = datetime.now() |
| _background_tasks[task_id].ai_messages = exec_result.ai_messages |
| except FuturesTimeoutError: |
| logger.error(f"[trace={self.trace_id}] Subagent {self.config.name} execution timed out after {self.config.timeout_seconds}s") |
| with _background_tasks_lock: |
| _background_tasks[task_id].status = SubagentStatus.TIMED_OUT |
| _background_tasks[task_id].error = f"Execution timed out after {self.config.timeout_seconds} seconds" |
| _background_tasks[task_id].completed_at = datetime.now() |
| |
| execution_future.cancel() |
| except Exception as e: |
| logger.exception(f"[trace={self.trace_id}] Subagent {self.config.name} async execution failed") |
| with _background_tasks_lock: |
| _background_tasks[task_id].status = SubagentStatus.FAILED |
| _background_tasks[task_id].error = str(e) |
| _background_tasks[task_id].completed_at = datetime.now() |
|
|
| _scheduler_pool.submit(run_task) |
| return task_id |
|
|
|
|
| MAX_CONCURRENT_SUBAGENTS = 3 |
|
|
|
|
| def get_background_task_result(task_id: str) -> SubagentResult | None: |
| """Get the result of a background task. |
| |
| Args: |
| task_id: The task ID returned by execute_async. |
| |
| Returns: |
| SubagentResult if found, None otherwise. |
| """ |
| with _background_tasks_lock: |
| return _background_tasks.get(task_id) |
|
|
|
|
| def list_background_tasks() -> list[SubagentResult]: |
| """List all background tasks. |
| |
| Returns: |
| List of all SubagentResult instances. |
| """ |
| with _background_tasks_lock: |
| return list(_background_tasks.values()) |
|
|