File size: 16,506 Bytes
033ca06 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 | """Subagent execution engine."""
import logging
import threading
import uuid
from concurrent.futures import Future, ThreadPoolExecutor
from concurrent.futures import TimeoutError as FuturesTimeoutError
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any
from langchain.agents import create_agent
from langchain.tools import BaseTool
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.runnables import RunnableConfig
from src.agents.thread_state import SandboxState, ThreadDataState, ThreadState
from src.models import create_chat_model
from src.subagents.config import SubagentConfig
logger = logging.getLogger(__name__)
class SubagentStatus(Enum):
"""Status of a subagent execution."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
TIMED_OUT = "timed_out"
@dataclass
class SubagentResult:
"""Result of a subagent execution.
Attributes:
task_id: Unique identifier for this execution.
trace_id: Trace ID for distributed tracing (links parent and subagent logs).
status: Current status of the execution.
result: The final result message (if completed).
error: Error message (if failed).
started_at: When execution started.
completed_at: When execution completed.
ai_messages: List of complete AI messages (as dicts) generated during execution.
"""
task_id: str
trace_id: str
status: SubagentStatus
result: str | None = None
error: str | None = None
started_at: datetime | None = None
completed_at: datetime | None = None
ai_messages: list[dict[str, Any]] | None = None
def __post_init__(self):
"""Initialize mutable defaults."""
if self.ai_messages is None:
self.ai_messages = []
# Global storage for background task results
_background_tasks: dict[str, SubagentResult] = {}
_background_tasks_lock = threading.Lock()
# Thread pool for background task scheduling and orchestration
_scheduler_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-scheduler-")
# Thread pool for actual subagent execution (with timeout support)
# Larger pool to avoid blocking when scheduler submits execution tasks
_execution_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-exec-")
def _filter_tools(
all_tools: list[BaseTool],
allowed: list[str] | None,
disallowed: list[str] | None,
) -> list[BaseTool]:
"""Filter tools based on subagent configuration.
Args:
all_tools: List of all available tools.
allowed: Optional allowlist of tool names. If provided, only these tools are included.
disallowed: Optional denylist of tool names. These tools are always excluded.
Returns:
Filtered list of tools.
"""
filtered = all_tools
# Apply allowlist if specified
if allowed is not None:
allowed_set = set(allowed)
filtered = [t for t in filtered if t.name in allowed_set]
# Apply denylist
if disallowed is not None:
disallowed_set = set(disallowed)
filtered = [t for t in filtered if t.name not in disallowed_set]
return filtered
def _get_model_name(config: SubagentConfig, parent_model: str | None) -> str | None:
"""Resolve the model name for a subagent.
Args:
config: Subagent configuration.
parent_model: The parent agent's model name.
Returns:
Model name to use, or None to use default.
"""
if config.model == "inherit":
return parent_model
return config.model
class SubagentExecutor:
"""Executor for running subagents."""
def __init__(
self,
config: SubagentConfig,
tools: list[BaseTool],
parent_model: str | None = None,
sandbox_state: SandboxState | None = None,
thread_data: ThreadDataState | None = None,
thread_id: str | None = None,
trace_id: str | None = None,
):
"""Initialize the executor.
Args:
config: Subagent configuration.
tools: List of all available tools (will be filtered).
parent_model: The parent agent's model name for inheritance.
sandbox_state: Sandbox state from parent agent.
thread_data: Thread data from parent agent.
thread_id: Thread ID for sandbox operations.
trace_id: Trace ID from parent for distributed tracing.
"""
self.config = config
self.parent_model = parent_model
self.sandbox_state = sandbox_state
self.thread_data = thread_data
self.thread_id = thread_id
# Generate trace_id if not provided (for top-level calls)
self.trace_id = trace_id or str(uuid.uuid4())[:8]
# Filter tools based on config
self.tools = _filter_tools(
tools,
config.tools,
config.disallowed_tools,
)
logger.info(f"[trace={self.trace_id}] SubagentExecutor initialized: {config.name} with {len(self.tools)} tools")
def _create_agent(self):
"""Create the agent instance."""
model_name = _get_model_name(self.config, self.parent_model)
model = create_chat_model(name=model_name, thinking_enabled=False)
# Subagents need minimal middlewares to ensure tools can access sandbox and thread_data
# These middlewares will reuse the sandbox/thread_data from parent agent
from src.agents.middlewares.thread_data_middleware import ThreadDataMiddleware
from src.sandbox.middleware import SandboxMiddleware
middlewares = [
ThreadDataMiddleware(lazy_init=True), # Compute thread paths
SandboxMiddleware(lazy_init=True), # Reuse parent's sandbox (no re-acquisition)
]
return create_agent(
model=model,
tools=self.tools,
middleware=middlewares,
system_prompt=self.config.system_prompt,
state_schema=ThreadState,
)
def _build_initial_state(self, task: str) -> dict[str, Any]:
"""Build the initial state for agent execution.
Args:
task: The task description.
Returns:
Initial state dictionary.
"""
state: dict[str, Any] = {
"messages": [HumanMessage(content=task)],
}
# Pass through sandbox and thread data from parent
if self.sandbox_state is not None:
state["sandbox"] = self.sandbox_state
if self.thread_data is not None:
state["thread_data"] = self.thread_data
return state
def execute(self, task: str, result_holder: SubagentResult | None = None) -> SubagentResult:
"""Execute a task synchronously.
Args:
task: The task description for the subagent.
result_holder: Optional pre-created result object to update during execution.
Returns:
SubagentResult with the execution result.
"""
if result_holder is not None:
# Use the provided result holder (for async execution with real-time updates)
result = result_holder
else:
# Create a new result for synchronous execution
task_id = str(uuid.uuid4())[:8]
result = SubagentResult(
task_id=task_id,
trace_id=self.trace_id,
status=SubagentStatus.RUNNING,
started_at=datetime.now(),
)
try:
agent = self._create_agent()
state = self._build_initial_state(task)
# Build config with thread_id for sandbox access and recursion limit
run_config: RunnableConfig = {
"recursion_limit": self.config.max_turns,
}
context = {}
if self.thread_id:
run_config["configurable"] = {"thread_id": self.thread_id}
context["thread_id"] = self.thread_id
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} starting execution with max_turns={self.config.max_turns}")
# Use stream instead of invoke to get real-time updates
# This allows us to collect AI messages as they are generated
final_state = None
for chunk in agent.stream(state, config=run_config, context=context, stream_mode="values"): # type: ignore[arg-type]
final_state = chunk
# Extract AI messages from the current state
messages = chunk.get("messages", [])
if messages:
last_message = messages[-1]
# Check if this is a new AI message
if isinstance(last_message, AIMessage):
# Convert message to dict for serialization
message_dict = last_message.model_dump()
# Only add if it's not already in the list (avoid duplicates)
# Check by comparing message IDs if available, otherwise compare full dict
message_id = message_dict.get("id")
is_duplicate = False
if message_id:
is_duplicate = any(msg.get("id") == message_id for msg in result.ai_messages)
else:
is_duplicate = message_dict in result.ai_messages
if not is_duplicate:
result.ai_messages.append(message_dict)
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} captured AI message #{len(result.ai_messages)}")
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} completed execution")
if final_state is None:
logger.warning(f"[trace={self.trace_id}] Subagent {self.config.name} no final state")
result.result = "No response generated"
else:
# Extract the final message - find the last AIMessage
messages = final_state.get("messages", [])
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} final messages count: {len(messages)}")
# Find the last AIMessage in the conversation
last_ai_message = None
for msg in reversed(messages):
if isinstance(msg, AIMessage):
last_ai_message = msg
break
if last_ai_message is not None:
content = last_ai_message.content
# Handle both str and list content types for the final result
if isinstance(content, str):
result.result = content
elif isinstance(content, list):
# Extract text from list of content blocks for final result only
text_parts = []
for block in content:
if isinstance(block, str):
text_parts.append(block)
elif isinstance(block, dict) and "text" in block:
text_parts.append(block["text"])
result.result = "\n".join(text_parts) if text_parts else "No text content in response"
else:
result.result = str(content)
elif messages:
# Fallback: use the last message if no AIMessage found
last_message = messages[-1]
logger.warning(f"[trace={self.trace_id}] Subagent {self.config.name} no AIMessage found, using last message: {type(last_message)}")
result.result = str(last_message.content) if hasattr(last_message, "content") else str(last_message)
else:
logger.warning(f"[trace={self.trace_id}] Subagent {self.config.name} no messages in final state")
result.result = "No response generated"
result.status = SubagentStatus.COMPLETED
result.completed_at = datetime.now()
except Exception as e:
logger.exception(f"[trace={self.trace_id}] Subagent {self.config.name} execution failed")
result.status = SubagentStatus.FAILED
result.error = str(e)
result.completed_at = datetime.now()
return result
def execute_async(self, task: str, task_id: str | None = None) -> str:
"""Start a task execution in the background.
Args:
task: The task description for the subagent.
task_id: Optional task ID to use. If not provided, a random UUID will be generated.
Returns:
Task ID that can be used to check status later.
"""
# Use provided task_id or generate a new one
if task_id is None:
task_id = str(uuid.uuid4())[:8]
# Create initial pending result
result = SubagentResult(
task_id=task_id,
trace_id=self.trace_id,
status=SubagentStatus.PENDING,
)
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} starting async execution, task_id={task_id}, timeout={self.config.timeout_seconds}s")
with _background_tasks_lock:
_background_tasks[task_id] = result
# Submit to scheduler pool
def run_task():
with _background_tasks_lock:
_background_tasks[task_id].status = SubagentStatus.RUNNING
_background_tasks[task_id].started_at = datetime.now()
result_holder = _background_tasks[task_id]
try:
# Submit execution to execution pool with timeout
# Pass result_holder so execute() can update it in real-time
execution_future: Future = _execution_pool.submit(self.execute, task, result_holder)
try:
# Wait for execution with timeout
exec_result = execution_future.result(timeout=self.config.timeout_seconds)
with _background_tasks_lock:
_background_tasks[task_id].status = exec_result.status
_background_tasks[task_id].result = exec_result.result
_background_tasks[task_id].error = exec_result.error
_background_tasks[task_id].completed_at = datetime.now()
_background_tasks[task_id].ai_messages = exec_result.ai_messages
except FuturesTimeoutError:
logger.error(f"[trace={self.trace_id}] Subagent {self.config.name} execution timed out after {self.config.timeout_seconds}s")
with _background_tasks_lock:
_background_tasks[task_id].status = SubagentStatus.TIMED_OUT
_background_tasks[task_id].error = f"Execution timed out after {self.config.timeout_seconds} seconds"
_background_tasks[task_id].completed_at = datetime.now()
# Cancel the future (best effort - may not stop the actual execution)
execution_future.cancel()
except Exception as e:
logger.exception(f"[trace={self.trace_id}] Subagent {self.config.name} async execution failed")
with _background_tasks_lock:
_background_tasks[task_id].status = SubagentStatus.FAILED
_background_tasks[task_id].error = str(e)
_background_tasks[task_id].completed_at = datetime.now()
_scheduler_pool.submit(run_task)
return task_id
MAX_CONCURRENT_SUBAGENTS = 3
def get_background_task_result(task_id: str) -> SubagentResult | None:
"""Get the result of a background task.
Args:
task_id: The task ID returned by execute_async.
Returns:
SubagentResult if found, None otherwise.
"""
with _background_tasks_lock:
return _background_tasks.get(task_id)
def list_background_tasks() -> list[SubagentResult]:
"""List all background tasks.
Returns:
List of all SubagentResult instances.
"""
with _background_tasks_lock:
return list(_background_tasks.values())
|