| """Graph runner - executes the agent graph.
|
|
|
| The runner orchestrates node execution, manages state transitions,
|
| and yields status updates for streaming.
|
|
|
| Uses timeout-based execution instead of fixed iteration count.
|
| """
|
|
|
| import logging
|
| import time
|
| from typing import AsyncGenerator, Dict, Type
|
|
|
| from app.agents.graph.state import AgentState, NodeType
|
| from app.agents.graph.nodes import (
|
| BaseNode,
|
| PlanNode,
|
| SearchNode,
|
| NavigateNode,
|
| ExtractNode,
|
| VerifyNode,
|
| RespondNode,
|
| )
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| NODE_REGISTRY: Dict[NodeType, Type[BaseNode]] = {
|
| NodeType.PLAN: PlanNode,
|
| NodeType.SEARCH: SearchNode,
|
| NodeType.NAVIGATE: NavigateNode,
|
| NodeType.EXTRACT: ExtractNode,
|
| NodeType.VERIFY: VerifyNode,
|
| NodeType.RESPOND: RespondNode,
|
| }
|
|
|
|
|
| STATUS_MESSAGES = {
|
| NodeType.PLAN: "π― Planning task...",
|
| NodeType.SEARCH: "π Searching...",
|
| NodeType.NAVIGATE: "π Navigating...",
|
| NodeType.EXTRACT: "π Extracting content...",
|
| NodeType.VERIFY: "π€ Analyzing...",
|
| NodeType.RESPOND: "β
Generating response...",
|
| }
|
|
|
|
|
| async def run_graph(state: AgentState) -> AsyncGenerator[dict, None]:
|
| """Run the agent graph and yield status updates.
|
|
|
| Args:
|
| state: Initial agent state with task, url, and desktop
|
|
|
| Yields:
|
| Status updates and final result
|
| """
|
|
|
| state.start_time = time.time()
|
| current_node_type = NodeType.PLAN
|
| state.current_node = current_node_type
|
|
|
| logger.info(f"Starting graph execution for task: {state.task[:50]}, timeout: {state.timeout_seconds}s")
|
|
|
| while state.should_continue():
|
| state.step_count += 1
|
| state.current_node = current_node_type
|
|
|
|
|
| node_class = NODE_REGISTRY.get(current_node_type)
|
| if not node_class:
|
| logger.error(f"Unknown node type: {current_node_type}")
|
| break
|
|
|
| node = node_class()
|
|
|
|
|
| remaining = int(state.get_remaining_time())
|
| elapsed = int(state.get_elapsed_time())
|
|
|
|
|
| status_msg = STATUS_MESSAGES.get(current_node_type, "Processing...")
|
| if current_node_type == NodeType.SEARCH and state.plan.get("steps"):
|
| for step in state.plan["steps"]:
|
| if step.get("action") == "search":
|
| status_msg = f"π Searching: {step.get('query', state.task)[:40]}..."
|
| break
|
| elif current_node_type == NodeType.NAVIGATE and state.url:
|
| status_msg = f"π Navigating to {state.url[:40]}..."
|
|
|
| yield {
|
| "type": "status",
|
| "message": f"{status_msg} (step {state.step_count}, {remaining}s remaining)"
|
| }
|
|
|
|
|
| try:
|
| state, next_node_type = await node.execute(state)
|
| logger.info(f"Step {state.step_count}: {current_node_type.value} -> {next_node_type.value} ({elapsed}s elapsed)")
|
|
|
|
|
| if current_node_type == NodeType.RESPOND:
|
| break
|
|
|
|
|
| current_node_type = next_node_type
|
|
|
| except Exception as e:
|
| logger.exception(f"Node execution failed: {e}")
|
| state.add_error(str(e))
|
|
|
|
|
| if state.get_remaining_time() < 30:
|
| current_node_type = NodeType.RESPOND
|
| else:
|
| current_node_type = NodeType.SEARCH
|
|
|
|
|
| if not state.final_result and not state.success:
|
| logger.warning("Timeout reached, forcing response generation")
|
| respond_node = RespondNode()
|
| state, _ = await respond_node.execute(state)
|
|
|
|
|
| yield {
|
| "type": "result",
|
| "content": state.final_result,
|
| "links": state.visited_urls[:10],
|
| "success": state.success
|
| }
|
|
|
| yield {"type": "complete", "message": f"Task completed in {int(state.get_elapsed_time())}s"}
|
|
|
| logger.info(f"Graph execution complete. Success: {state.success}, Steps: {state.step_count}, Time: {state.get_elapsed_time():.1f}s")
|
|
|
|
|