| | """Router Node - Decides which specialized agent to use""" |
| | from typing import Dict, Any, Literal |
| | from langchain_core.messages import SystemMessage, HumanMessage |
| | from langchain_groq import ChatGroq |
| | from src.tracing import get_langfuse_callback_handler |
| |
|
| |
|
| | def load_router_prompt() -> str: |
| | """Load the router prompt from file""" |
| | try: |
| | with open("./prompts/router_prompt.txt", "r", encoding="utf-8") as f: |
| | return f.read().strip() |
| | except FileNotFoundError: |
| | return """You are an intelligent agent router. Analyze the query and respond with exactly one of: RETRIEVAL, EXECUTION, or CRITIC""" |
| |
|
| |
|
| | def router_node(state: Dict[str, Any]) -> Dict[str, Any]: |
| | """ |
| | Router node that analyzes the user query and determines which agent should handle it |
| | Returns: next_agent = 'retrieval' | 'execution' | 'critic' |
| | """ |
| | print("Router Node: Analyzing query for agent selection") |
| | |
| | try: |
| | |
| | router_prompt = load_router_prompt() |
| | |
| | |
| | llm = ChatGroq(model="qwen-qwq-32b", temperature=0.0) |
| | |
| | |
| | callback_handler = get_langfuse_callback_handler() |
| | callbacks = [callback_handler] if callback_handler else [] |
| | |
| | |
| | messages = state.get("messages", []) |
| | user_query = None |
| | |
| | for msg in reversed(messages): |
| | if msg.type == "human": |
| | user_query = msg.content |
| | break |
| | |
| | if not user_query: |
| | print("Router Node: No user query found, defaulting to retrieval") |
| | return { |
| | **state, |
| | "next_agent": "retrieval", |
| | "routing_reason": "No user query found" |
| | } |
| | |
| | |
| | routing_messages = [ |
| | SystemMessage(content=router_prompt), |
| | HumanMessage(content=f"Query to route: {user_query}") |
| | ] |
| | |
| | |
| | response = llm.invoke(routing_messages, config={"callbacks": callbacks}) |
| | routing_decision = response.content.strip().upper() |
| | |
| | |
| | next_agent = "retrieval" |
| | if "RETRIEVAL" in routing_decision: |
| | next_agent = "retrieval" |
| | elif "EXECUTION" in routing_decision: |
| | next_agent = "execution" |
| | elif "CRITIC" in routing_decision: |
| | next_agent = "critic" |
| | |
| | print(f"Router Node: Routing to {next_agent} agent (decision: {routing_decision})") |
| | |
| | return { |
| | **state, |
| | "next_agent": next_agent, |
| | "routing_decision": routing_decision, |
| | "routing_reason": f"Query analysis resulted in: {routing_decision}", |
| | "current_step": next_agent |
| | } |
| | |
| | except Exception as e: |
| | print(f"Router Node Error: {e}") |
| | |
| | return { |
| | **state, |
| | "next_agent": "retrieval", |
| | "routing_reason": f"Error in routing: {e}" |
| | } |
| |
|
| |
|
| | def should_route_to_agent(state: Dict[str, Any]) -> Literal["retrieval", "execution", "critic"]: |
| | """ |
| | Conditional edge function that determines which agent to route to |
| | """ |
| | next_agent = state.get("next_agent", "retrieval") |
| | print(f"Routing to: {next_agent}") |
| | return next_agent |