""" LLM Router Node for LangGraph Workflow This module provides the main LLM router node with a single, intelligent routing step. Purpose: - Replace intent_parsing_node + tool_planning_node with single LLM call - Integrate LLM client and validator for intelligent routing - Maintain compatibility with existing AgentState interface - Provide clear error handling and fallback mechanisms Architecture: - llm_router_node: Main LangGraph node function - RouterOrchestrator: Coordinates LLM client and validator - Error handling: Graceful degradation when LLM or validation fails - State management: Clean updates to AgentState structure Design Principles: - Single responsibility: Route query to appropriate tool - Fail-safe operation: Always return valid state, never crash - Transparent decisions: Log all routing decisions clearly - Performance oriented: Fast validation, reasonable LLM timeouts Integration Points: - Uses LLMClient for intelligent query analysis - Uses ProposalValidator for safety validation - Updates AgentState for downstream tool execution - Integrates with existing tool registry system Future Extensions: - TODO: Add conversation context integration for multi-turn dialogues - TODO: Add multi-tool coordination for complex scenarios - TODO: Add RAG integration for emergency procedure queries - TODO: Add performance monitoring and LLM routing analytics - TODO: Add A/B testing support for prompt optimization """ import asyncio import logging from typing import Dict, Any, List, Optional import yaml import os from pathlib import Path from datetime import datetime from .state import AgentState, ToolCall, update_processing_status, add_error from .llm_client import LLMClient, ToolProposal, LLMRoutingError from .validator import ProposalValidator, ValidationResult, ValidationOutcome from .registry import get_tool_registry # Load LLM router config def _load_llm_router_config(): config_path = Path(__file__).parent / "config" / "llm_router_config.yaml" try: with open(config_path, "r") as f: return yaml.safe_load(f) except Exception as e: logging.warning(f"Could not load llm_router_config.yaml: {e}") return {} LLM_ROUTER_CONFIG = _load_llm_router_config() class RouterOrchestrator: """ Orchestrates LLM routing with validation This class coordinates the LLM client and proposal validator to provide safe, validated tool routing decisions. It serves as the main interface between the agent workflow and the LLM routing system. """ def __init__( self, llm_provider: Optional[str] = None, llm_model: Optional[str] = None, llm_temperature: float = 0.1, llm_timeout: int = 10, fallback_provider: Optional[str] = None, fallback_model: Optional[str] = None ): """Initialize router with configurable LLM client and validator with fallback""" # Load config if parameters not provided if llm_provider is None or llm_model is None: llm_config = LLM_ROUTER_CONFIG.get("llm", {}) llm_provider = llm_provider or llm_config.get("provider") llm_model = llm_model or llm_config.get("model") fallback_config = llm_config.get("fallback", {}) fallback_provider = fallback_provider or fallback_config.get("provider") fallback_model = fallback_model or fallback_config.get("model") self.llm_client = LLMClient( provider=llm_provider, model=llm_model, temperature=llm_temperature, timeout=llm_timeout, fallback_provider=fallback_provider, fallback_model=fallback_model ) self.validator = ProposalValidator() self.tool_registry = get_tool_registry() # Load configuration files self._load_configurations() logging.info(f"๐ŸŽฏ RouterOrchestrator initialized with {llm_provider}/{llm_model}") def _load_configurations(self): """Load global configurations and tool-specific configs""" try: # Load global geography configuration config_dir = Path(__file__).parent / "config" geography_path = config_dir / "geography.yaml" if geography_path.exists(): with open(geography_path, 'r', encoding='utf-8') as f: self.geography_config = yaml.safe_load(f) logging.info("๐Ÿ“ Loaded geography configuration") else: self.geography_config = None logging.warning("๐Ÿ“ Geography configuration not found") # Load tool registry configuration tool_registry_path = config_dir / "tool_registry.yaml" if tool_registry_path.exists(): with open(tool_registry_path, 'r', encoding='utf-8') as f: self.tool_registry_config = yaml.safe_load(f) logging.info("๐Ÿ”ง Loaded tool registry configuration") else: self.tool_registry_config = None logging.warning("๐Ÿ”ง Tool registry configuration not found") # Load OMIRL sensor types self._load_omirl_config() except Exception as e: logging.error(f"โŒ Error loading configurations: {e}") self.geography_config = None self.tool_registry_config = None self.omirl_config = None def _load_omirl_config(self): """Load OMIRL-specific configuration""" try: omirl_config_path = Path(__file__).parent.parent / "tools" / "omirl" / "config" / "parameters.yaml" if omirl_config_path.exists(): with open(omirl_config_path, 'r', encoding='utf-8') as f: self.omirl_config = yaml.safe_load(f) logging.info("๐ŸŒฆ๏ธ Loaded OMIRL configuration") else: self.omirl_config = None logging.warning("๐ŸŒฆ๏ธ OMIRL configuration not found") except Exception as e: logging.error(f"โŒ Error loading OMIRL configuration: {e}") self.omirl_config = None async def route_query( self, user_query: str, conversation_context: Optional[List[Dict[str, str]]] = None, user_context: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Route user query to appropriate tool with full validation This is the main entry point for LLM-based routing. It coordinates the LLM client and validator to produce safe, executable tool calls. Args: user_query: User input in Italian or English conversation_context: Previous conversation history user_context: User session and metadata Returns: Routing result with tool calls or clarification requests """ start_time = datetime.now() logging.info(f"๐Ÿš€ Routing query: {user_query[:100]}...") try: # 1. Get available tools for LLM available_tools = self._get_available_tools_for_llm() # 2. Get LLM routing proposal logging.info("๐Ÿค– Requesting LLM routing proposal...") proposal = await self.llm_client.route_query( user_query=user_query, available_tools=available_tools, conversation_context=conversation_context ) # Track which LLM was used llm_usage = self.llm_client.get_llm_usage_info() logging.info(f"๐Ÿ“ LLM proposal: {proposal.tool}/{proposal.task} (confidence: {proposal.confidence})") logging.info(f"๐Ÿ”ง LLM used: {llm_usage['provider']}/{llm_usage['model']}") # 3. Validate proposal logging.info("๐Ÿ›ก๏ธ Validating LLM proposal...") validation_outcome = await self.validator.validate_proposal( proposal=proposal, user_context=user_context ) # 4. Process validation result routing_time = (datetime.now() - start_time).total_seconds() if validation_outcome.execution_approved: logging.info(f"โœ… Routing approved ({routing_time:.3f}s): {proposal.tool}") return self._create_approved_result(proposal, validation_outcome, routing_time, llm_usage) elif validation_outcome.result == ValidationResult.CLARIFICATION: logging.info(f"โ“ Clarification needed ({routing_time:.3f}s): {validation_outcome.reason}") return self._create_clarification_result(proposal, validation_outcome, routing_time, llm_usage) else: # REJECTED logging.warning(f"โŒ Routing rejected ({routing_time:.3f}s): {validation_outcome.reason}") return self._create_rejection_result(proposal, validation_outcome, routing_time, llm_usage) except LLMRoutingError as e: logging.error(f"๐Ÿšจ LLM routing failed: {e}") return self._create_error_result(user_query, f"LLM routing error: {str(e)}") except Exception as e: logging.error(f"๐Ÿ’ฅ Router orchestrator failed: {e}") return self._create_error_result(user_query, f"Internal routing error: {str(e)}") def _get_available_tools_for_llm(self) -> List[Dict[str, Any]]: """Get tool specifications formatted for LLM prompt with configuration data""" tools = [] # Build tools based on tool registry configuration if self.tool_registry_config and "tools" in self.tool_registry_config: for tool_name, tool_config in self.tool_registry_config["tools"].items(): # Skip disabled tools if not tool_config.get("enabled", True): continue # Build formatted tool with configuration data formatted_tool = { "name": tool_name, "display_name": tool_config.get("display_name", tool_name), "description": tool_config.get("description", ""), "enabled": tool_config.get("enabled", True), "tasks": tool_config.get("tasks", {}), } # Add geography data for all tools if self.geography_config: # Extract Liguria region data for compatibility with prompt functions liguria_data = self.geography_config.get("regions", {}).get("liguria", {}) formatted_tool["geography_data"] = liguria_data # Add sensor types for OMIRL tool if tool_name == "omirl_tool" and self.omirl_config: sensor_types = self.omirl_config.get("sensor_types", []) formatted_tool["sensor_types"] = sensor_types tools.append(formatted_tool) else: # Fallback to registry tools if no configuration registry_tools = self.tool_registry.get_tool_specs_for_llm() for tool_spec in registry_tools: formatted_tool = { "name": tool_spec["name"], "description": tool_spec["description"], "parameters": tool_spec["parameters"], "geographic_scope": "Liguria (Genova, Savona, Imperia, La Spezia)" } # Add geography data if available if self.geography_config: # Extract Liguria region data for compatibility with prompt functions liguria_data = self.geography_config.get("regions", {}).get("liguria", {}) formatted_tool["geography_data"] = liguria_data tools.append(formatted_tool) return tools def _create_approved_result( self, proposal: ToolProposal, validation: ValidationOutcome, routing_time: float, llm_usage: Dict[str, Any] ) -> Dict[str, Any]: """Create result for approved routing""" # Use modified parameters if validator made changes final_params = validation.modified_params or proposal.params tool_call = ToolCall( tool_name=proposal.tool, task=proposal.task, parameters=final_params, reason=proposal.reasoning ) return { "status": "approved", "tool_calls": [tool_call], "routing_confidence": proposal.confidence, "validation_confidence": validation.confidence, "routing_time_ms": int(routing_time * 1000), "llm_reasoning": proposal.reasoning, "validation_reason": validation.reason, "warnings": validation.warnings or [], "execution_ready": True, "llm_usage": llm_usage # Add LLM usage tracking } def _create_clarification_result( self, proposal: ToolProposal, validation: ValidationOutcome, routing_time: float, llm_usage: Dict[str, Any] ) -> Dict[str, Any]: """Create result for clarification request""" # Use LLM's clarification if available, otherwise validator's clarification_text = ( proposal.clarification_request or getattr(validation, 'suggested_clarification', None) or validation.reason ) return { "status": "clarification_needed", "clarification_request": clarification_text, "routing_confidence": proposal.confidence, "validation_confidence": validation.confidence, "routing_time_ms": int(routing_time * 1000), "llm_reasoning": proposal.reasoning, "validation_reason": validation.reason, "suggested_action": validation.suggested_action, "execution_ready": False, "llm_usage": llm_usage } def _create_rejection_result( self, proposal: ToolProposal, validation: ValidationOutcome, routing_time: float, llm_usage: Dict[str, Any] ) -> Dict[str, Any]: """Create result for rejected routing""" return { "status": "rejected", "rejection_reason": validation.reason, "routing_confidence": proposal.confidence, "validation_confidence": validation.confidence, "routing_time_ms": int(routing_time * 1000), "llm_reasoning": proposal.reasoning, "validation_reason": validation.reason, "suggested_action": validation.suggested_action, "execution_ready": False, "llm_usage": llm_usage } def _create_error_result(self, user_query: str, error_message: str) -> Dict[str, Any]: """Create result for routing errors""" return { "status": "error", "error_message": error_message, "original_query": user_query, "routing_confidence": 0.0, "validation_confidence": 0.0, "fallback_message": "Mi dispiace, si รจ verificato un errore nel processare la tua richiesta. Puoi riprovare?", "execution_ready": False } # Global router instance for node function _router = None def reset_router(): """Reset global router instance (useful for testing)""" global _router _router = None def get_router() -> RouterOrchestrator: """Get or create global router instance - config loaded from llm_router_config.yaml""" global _router if _router is None: # RouterOrchestrator will load config from yaml automatically _router = RouterOrchestrator() logging.info("๐Ÿš€ Router initialized from config") return _router async def llm_router_node(state: AgentState) -> AgentState: """ LLM Router Node for LangGraph Workflow This node replaces the complex intent_parsing_node + tool_planning_node with a single intelligent routing step using LLM + deterministic validation. Process: 1. Extract user query from state 2. Route query through LLM client 3. Validate routing proposal 4. Update state with results Args: state: Current agent state with user_message Returns: Updated state with routing results State Updates: - routing_result: Complete routing decision and metadata - planned_tools: Tool calls if routing approved - agent_response: Clarification/rejection message if needed - errors: Any routing errors encountered """ logging.info("๐ŸŽฏ LLM Router Node: Starting intelligent routing...") # Update processing status state = update_processing_status(state, "llm_routing", "Analyzing query with LLM router") try: # Extract user query user_query = state.get("user_message", "").strip() if not user_query: logging.warning("โš ๏ธ Empty user query received") state = add_error(state, "Empty user query", "llm_routing") state["agent_response"] = "Mi dispiace, non ho ricevuto nessuna richiesta. Puoi ripetere?" return state # Get conversation context if available conversation_context = state.get("conversation_history", []) # TODO: Extract user context from state user_context = { "session_id": state.get("session_id"), "user_id": state.get("user_id"), "timestamp": datetime.now().isoformat() } # Route query router = get_router() routing_result = await router.route_query( user_query=user_query, conversation_context=conversation_context, user_context=user_context ) # Update state based on routing result state["routing_result"] = routing_result # Track LLM usage if available if "llm_usage" in routing_result: if "llm_usage" not in state: state["llm_usage"] = {} state["llm_usage"]["routing"] = routing_result["llm_usage"] if routing_result["status"] == "approved": # Ready for tool execution state["planned_tools"] = routing_result["tool_calls"] state = update_processing_status(state, "approved", "Tools planned for execution") logging.info(f"โœ… Tool execution planned: {len(routing_result['tool_calls'])} tools") elif routing_result["status"] == "clarification_needed": # Need clarification from user state["clarification_request"] = routing_result["clarification_request"] state = update_processing_status(state, "needs_clarification", "Clarification needed") logging.info("โ“ Clarification requested from user") elif routing_result["status"] == "rejected": # Request rejected with explanation state["rejection_reason"] = routing_result["rejection_reason"] state = update_processing_status(state, "rejected", "Request rejected") logging.info("โŒ Request rejected by validator") else: # error # Routing error - provide fallback state = add_error(state, routing_result["error_message"], "llm_routing") state = update_processing_status(state, "error", "Routing error occurred") logging.error("๐Ÿ’ฅ Routing error occurred") # Add performance metrics routing_time = routing_result.get("routing_time_ms", 0) state.setdefault("performance_metrics", {})["llm_routing_ms"] = routing_time logging.info(f"๐ŸŽฏ LLM Router Node completed ({routing_time}ms)") return state except Exception as e: logging.error(f"๐Ÿ’ฅ LLM Router Node failed: {e}") state = add_error(state, f"LLM router node error: {str(e)}", "llm_routing") state["agent_response"] = "Mi dispiace, si รจ verificato un errore tecnico. Puoi riprovare?" return state # TODO: Future conversation context integration async def llm_router_with_context_node(state: AgentState) -> AgentState: """ Enhanced LLM router with conversation context integration This future version will better handle multi-turn conversations and maintain context across multiple queries in a session. """ pass # TODO: Future emergency priority router async def emergency_priority_router_node(state: AgentState) -> AgentState: """ Emergency-aware router with priority handling This future version will detect emergency scenarios and provide fast-track routing with reduced validation delays. """ pass # TODO: Future multi-tool coordination router async def multi_tool_coordination_router_node(state: AgentState) -> AgentState: """ Router capable of coordinating multiple tools This future version will handle complex scenarios requiring multiple data sources (OMIRL + bulletins + traffic). """ pass