Spaces:
Runtime error
Runtime error
| """ | |
| LLM Router Node for LangGraph Workflow | |
| This module provides the main LLM router node with a single, intelligent routing step. | |
| Purpose: | |
| - Replace intent_parsing_node + tool_planning_node with single LLM call | |
| - Integrate LLM client and validator for intelligent routing | |
| - Maintain compatibility with existing AgentState interface | |
| - Provide clear error handling and fallback mechanisms | |
| Architecture: | |
| - llm_router_node: Main LangGraph node function | |
| - RouterOrchestrator: Coordinates LLM client and validator | |
| - Error handling: Graceful degradation when LLM or validation fails | |
| - State management: Clean updates to AgentState structure | |
| Design Principles: | |
| - Single responsibility: Route query to appropriate tool | |
| - Fail-safe operation: Always return valid state, never crash | |
| - Transparent decisions: Log all routing decisions clearly | |
| - Performance oriented: Fast validation, reasonable LLM timeouts | |
| Integration Points: | |
| - Uses LLMClient for intelligent query analysis | |
| - Uses ProposalValidator for safety validation | |
| - Updates AgentState for downstream tool execution | |
| - Integrates with existing tool registry system | |
| Future Extensions: | |
| - TODO: Add conversation context integration for multi-turn dialogues | |
| - TODO: Add multi-tool coordination for complex scenarios | |
| - TODO: Add RAG integration for emergency procedure queries | |
| - TODO: Add performance monitoring and LLM routing analytics | |
| - TODO: Add A/B testing support for prompt optimization | |
| """ | |
| import asyncio | |
| import logging | |
| from typing import Dict, Any, List, Optional | |
| import yaml | |
| import os | |
| from pathlib import Path | |
| from datetime import datetime | |
| from .state import AgentState, ToolCall, update_processing_status, add_error | |
| from .llm_client import LLMClient, ToolProposal, LLMRoutingError | |
| from .validator import ProposalValidator, ValidationResult, ValidationOutcome | |
| from .registry import get_tool_registry | |
| # Load LLM router config | |
| def _load_llm_router_config(): | |
| config_path = Path(__file__).parent / "config" / "llm_router_config.yaml" | |
| try: | |
| with open(config_path, "r") as f: | |
| return yaml.safe_load(f) | |
| except Exception as e: | |
| logging.warning(f"Could not load llm_router_config.yaml: {e}") | |
| return {} | |
| LLM_ROUTER_CONFIG = _load_llm_router_config() | |
| class RouterOrchestrator: | |
| """ | |
| Orchestrates LLM routing with validation | |
| This class coordinates the LLM client and proposal validator to provide | |
| safe, validated tool routing decisions. It serves as the main interface | |
| between the agent workflow and the LLM routing system. | |
| """ | |
| def __init__( | |
| self, | |
| llm_provider: Optional[str] = None, | |
| llm_model: Optional[str] = None, | |
| llm_temperature: float = 0.1, | |
| llm_timeout: int = 10, | |
| fallback_provider: Optional[str] = None, | |
| fallback_model: Optional[str] = None | |
| ): | |
| """Initialize router with configurable LLM client and validator with fallback""" | |
| # Load config if parameters not provided | |
| if llm_provider is None or llm_model is None: | |
| llm_config = LLM_ROUTER_CONFIG.get("llm", {}) | |
| llm_provider = llm_provider or llm_config.get("provider") | |
| llm_model = llm_model or llm_config.get("model") | |
| fallback_config = llm_config.get("fallback", {}) | |
| fallback_provider = fallback_provider or fallback_config.get("provider") | |
| fallback_model = fallback_model or fallback_config.get("model") | |
| self.llm_client = LLMClient( | |
| provider=llm_provider, | |
| model=llm_model, | |
| temperature=llm_temperature, | |
| timeout=llm_timeout, | |
| fallback_provider=fallback_provider, | |
| fallback_model=fallback_model | |
| ) | |
| self.validator = ProposalValidator() | |
| self.tool_registry = get_tool_registry() | |
| # Load configuration files | |
| self._load_configurations() | |
| logging.info(f"🎯 RouterOrchestrator initialized with {llm_provider}/{llm_model}") | |
| def _load_configurations(self): | |
| """Load global configurations and tool-specific configs""" | |
| try: | |
| # Load global geography configuration | |
| config_dir = Path(__file__).parent / "config" | |
| geography_path = config_dir / "geography.yaml" | |
| if geography_path.exists(): | |
| with open(geography_path, 'r', encoding='utf-8') as f: | |
| self.geography_config = yaml.safe_load(f) | |
| logging.info("📍 Loaded geography configuration") | |
| else: | |
| self.geography_config = None | |
| logging.warning("📍 Geography configuration not found") | |
| # Load tool registry configuration | |
| tool_registry_path = config_dir / "tool_registry.yaml" | |
| if tool_registry_path.exists(): | |
| with open(tool_registry_path, 'r', encoding='utf-8') as f: | |
| self.tool_registry_config = yaml.safe_load(f) | |
| logging.info("🔧 Loaded tool registry configuration") | |
| else: | |
| self.tool_registry_config = None | |
| logging.warning("🔧 Tool registry configuration not found") | |
| # Load OMIRL sensor types | |
| self._load_omirl_config() | |
| except Exception as e: | |
| logging.error(f"❌ Error loading configurations: {e}") | |
| self.geography_config = None | |
| self.tool_registry_config = None | |
| self.omirl_config = None | |
| def _load_omirl_config(self): | |
| """Load OMIRL-specific configuration""" | |
| try: | |
| omirl_config_path = Path(__file__).parent.parent / "tools" / "omirl" / "config" / "parameters.yaml" | |
| if omirl_config_path.exists(): | |
| with open(omirl_config_path, 'r', encoding='utf-8') as f: | |
| self.omirl_config = yaml.safe_load(f) | |
| logging.info("🌦️ Loaded OMIRL configuration") | |
| else: | |
| self.omirl_config = None | |
| logging.warning("🌦️ OMIRL configuration not found") | |
| except Exception as e: | |
| logging.error(f"❌ Error loading OMIRL configuration: {e}") | |
| self.omirl_config = None | |
| async def route_query( | |
| self, | |
| user_query: str, | |
| conversation_context: Optional[List[Dict[str, str]]] = None, | |
| user_context: Optional[Dict[str, Any]] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Route user query to appropriate tool with full validation | |
| This is the main entry point for LLM-based routing. It coordinates | |
| the LLM client and validator to produce safe, executable tool calls. | |
| Args: | |
| user_query: User input in Italian or English | |
| conversation_context: Previous conversation history | |
| user_context: User session and metadata | |
| Returns: | |
| Routing result with tool calls or clarification requests | |
| """ | |
| start_time = datetime.now() | |
| logging.info(f"🚀 Routing query: {user_query[:100]}...") | |
| try: | |
| # 1. Get available tools for LLM | |
| available_tools = self._get_available_tools_for_llm() | |
| # 2. Get LLM routing proposal | |
| logging.info("🤖 Requesting LLM routing proposal...") | |
| proposal = await self.llm_client.route_query( | |
| user_query=user_query, | |
| available_tools=available_tools, | |
| conversation_context=conversation_context | |
| ) | |
| # Track which LLM was used | |
| llm_usage = self.llm_client.get_llm_usage_info() | |
| logging.info(f"📝 LLM proposal: {proposal.tool}/{proposal.task} (confidence: {proposal.confidence})") | |
| logging.info(f"🔧 LLM used: {llm_usage['provider']}/{llm_usage['model']}") | |
| # 3. Validate proposal | |
| logging.info("🛡️ Validating LLM proposal...") | |
| validation_outcome = await self.validator.validate_proposal( | |
| proposal=proposal, | |
| user_context=user_context | |
| ) | |
| # 4. Process validation result | |
| routing_time = (datetime.now() - start_time).total_seconds() | |
| if validation_outcome.execution_approved: | |
| logging.info(f"✅ Routing approved ({routing_time:.3f}s): {proposal.tool}") | |
| return self._create_approved_result(proposal, validation_outcome, routing_time, llm_usage) | |
| elif validation_outcome.result == ValidationResult.CLARIFICATION: | |
| logging.info(f"❓ Clarification needed ({routing_time:.3f}s): {validation_outcome.reason}") | |
| return self._create_clarification_result(proposal, validation_outcome, routing_time, llm_usage) | |
| else: # REJECTED | |
| logging.warning(f"❌ Routing rejected ({routing_time:.3f}s): {validation_outcome.reason}") | |
| return self._create_rejection_result(proposal, validation_outcome, routing_time, llm_usage) | |
| except LLMRoutingError as e: | |
| logging.error(f"🚨 LLM routing failed: {e}") | |
| return self._create_error_result(user_query, f"LLM routing error: {str(e)}") | |
| except Exception as e: | |
| logging.error(f"💥 Router orchestrator failed: {e}") | |
| return self._create_error_result(user_query, f"Internal routing error: {str(e)}") | |
| def _get_available_tools_for_llm(self) -> List[Dict[str, Any]]: | |
| """Get tool specifications formatted for LLM prompt with configuration data""" | |
| tools = [] | |
| # Build tools based on tool registry configuration | |
| if self.tool_registry_config and "tools" in self.tool_registry_config: | |
| for tool_name, tool_config in self.tool_registry_config["tools"].items(): | |
| # Skip disabled tools | |
| if not tool_config.get("enabled", True): | |
| continue | |
| # Build formatted tool with configuration data | |
| formatted_tool = { | |
| "name": tool_name, | |
| "display_name": tool_config.get("display_name", tool_name), | |
| "description": tool_config.get("description", ""), | |
| "enabled": tool_config.get("enabled", True), | |
| "tasks": tool_config.get("tasks", {}), | |
| } | |
| # Add geography data for all tools | |
| if self.geography_config: | |
| # Extract Liguria region data for compatibility with prompt functions | |
| liguria_data = self.geography_config.get("regions", {}).get("liguria", {}) | |
| formatted_tool["geography_data"] = liguria_data | |
| # Add sensor types for OMIRL tool | |
| if tool_name == "omirl_tool" and self.omirl_config: | |
| sensor_types = self.omirl_config.get("sensor_types", []) | |
| formatted_tool["sensor_types"] = sensor_types | |
| tools.append(formatted_tool) | |
| else: | |
| # Fallback to registry tools if no configuration | |
| registry_tools = self.tool_registry.get_tool_specs_for_llm() | |
| for tool_spec in registry_tools: | |
| formatted_tool = { | |
| "name": tool_spec["name"], | |
| "description": tool_spec["description"], | |
| "parameters": tool_spec["parameters"], | |
| "geographic_scope": "Liguria (Genova, Savona, Imperia, La Spezia)" | |
| } | |
| # Add geography data if available | |
| if self.geography_config: | |
| # Extract Liguria region data for compatibility with prompt functions | |
| liguria_data = self.geography_config.get("regions", {}).get("liguria", {}) | |
| formatted_tool["geography_data"] = liguria_data | |
| tools.append(formatted_tool) | |
| return tools | |
| def _create_approved_result( | |
| self, | |
| proposal: ToolProposal, | |
| validation: ValidationOutcome, | |
| routing_time: float, | |
| llm_usage: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Create result for approved routing""" | |
| # Use modified parameters if validator made changes | |
| final_params = validation.modified_params or proposal.params | |
| tool_call = ToolCall( | |
| tool_name=proposal.tool, | |
| task=proposal.task, | |
| parameters=final_params, | |
| reason=proposal.reasoning | |
| ) | |
| return { | |
| "status": "approved", | |
| "tool_calls": [tool_call], | |
| "routing_confidence": proposal.confidence, | |
| "validation_confidence": validation.confidence, | |
| "routing_time_ms": int(routing_time * 1000), | |
| "llm_reasoning": proposal.reasoning, | |
| "validation_reason": validation.reason, | |
| "warnings": validation.warnings or [], | |
| "execution_ready": True, | |
| "llm_usage": llm_usage # Add LLM usage tracking | |
| } | |
| def _create_clarification_result( | |
| self, | |
| proposal: ToolProposal, | |
| validation: ValidationOutcome, | |
| routing_time: float, | |
| llm_usage: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Create result for clarification request""" | |
| # Use LLM's clarification if available, otherwise validator's | |
| clarification_text = ( | |
| proposal.clarification_request or | |
| getattr(validation, 'suggested_clarification', None) or | |
| validation.reason | |
| ) | |
| return { | |
| "status": "clarification_needed", | |
| "clarification_request": clarification_text, | |
| "routing_confidence": proposal.confidence, | |
| "validation_confidence": validation.confidence, | |
| "routing_time_ms": int(routing_time * 1000), | |
| "llm_reasoning": proposal.reasoning, | |
| "validation_reason": validation.reason, | |
| "suggested_action": validation.suggested_action, | |
| "execution_ready": False, | |
| "llm_usage": llm_usage | |
| } | |
| def _create_rejection_result( | |
| self, | |
| proposal: ToolProposal, | |
| validation: ValidationOutcome, | |
| routing_time: float, | |
| llm_usage: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Create result for rejected routing""" | |
| return { | |
| "status": "rejected", | |
| "rejection_reason": validation.reason, | |
| "routing_confidence": proposal.confidence, | |
| "validation_confidence": validation.confidence, | |
| "routing_time_ms": int(routing_time * 1000), | |
| "llm_reasoning": proposal.reasoning, | |
| "validation_reason": validation.reason, | |
| "suggested_action": validation.suggested_action, | |
| "execution_ready": False, | |
| "llm_usage": llm_usage | |
| } | |
| def _create_error_result(self, user_query: str, error_message: str) -> Dict[str, Any]: | |
| """Create result for routing errors""" | |
| return { | |
| "status": "error", | |
| "error_message": error_message, | |
| "original_query": user_query, | |
| "routing_confidence": 0.0, | |
| "validation_confidence": 0.0, | |
| "fallback_message": "Mi dispiace, si è verificato un errore nel processare la tua richiesta. Puoi riprovare?", | |
| "execution_ready": False | |
| } | |
| # Global router instance for node function | |
| _router = None | |
| def reset_router(): | |
| """Reset global router instance (useful for testing)""" | |
| global _router | |
| _router = None | |
| def get_router() -> RouterOrchestrator: | |
| """Get or create global router instance - config loaded from llm_router_config.yaml""" | |
| global _router | |
| if _router is None: | |
| # RouterOrchestrator will load config from yaml automatically | |
| _router = RouterOrchestrator() | |
| logging.info("🚀 Router initialized from config") | |
| return _router | |
| async def llm_router_node(state: AgentState) -> AgentState: | |
| """ | |
| LLM Router Node for LangGraph Workflow | |
| This node replaces the complex intent_parsing_node + tool_planning_node | |
| with a single intelligent routing step using LLM + deterministic validation. | |
| Process: | |
| 1. Extract user query from state | |
| 2. Route query through LLM client | |
| 3. Validate routing proposal | |
| 4. Update state with results | |
| Args: | |
| state: Current agent state with user_message | |
| Returns: | |
| Updated state with routing results | |
| State Updates: | |
| - routing_result: Complete routing decision and metadata | |
| - planned_tools: Tool calls if routing approved | |
| - agent_response: Clarification/rejection message if needed | |
| - errors: Any routing errors encountered | |
| """ | |
| logging.info("🎯 LLM Router Node: Starting intelligent routing...") | |
| # Update processing status | |
| state = update_processing_status(state, "llm_routing", "Analyzing query with LLM router") | |
| try: | |
| # Extract user query | |
| user_query = state.get("user_message", "").strip() | |
| if not user_query: | |
| logging.warning("⚠️ Empty user query received") | |
| state = add_error(state, "Empty user query", "llm_routing") | |
| state["agent_response"] = "Mi dispiace, non ho ricevuto nessuna richiesta. Puoi ripetere?" | |
| return state | |
| # Get conversation context if available | |
| conversation_context = state.get("conversation_history", []) | |
| # TODO: Extract user context from state | |
| user_context = { | |
| "session_id": state.get("session_id"), | |
| "user_id": state.get("user_id"), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # Route query | |
| router = get_router() | |
| routing_result = await router.route_query( | |
| user_query=user_query, | |
| conversation_context=conversation_context, | |
| user_context=user_context | |
| ) | |
| # Update state based on routing result | |
| state["routing_result"] = routing_result | |
| # Track LLM usage if available | |
| if "llm_usage" in routing_result: | |
| if "llm_usage" not in state: | |
| state["llm_usage"] = {} | |
| state["llm_usage"]["routing"] = routing_result["llm_usage"] | |
| if routing_result["status"] == "approved": | |
| # Ready for tool execution | |
| state["planned_tools"] = routing_result["tool_calls"] | |
| state = update_processing_status(state, "approved", "Tools planned for execution") | |
| logging.info(f"✅ Tool execution planned: {len(routing_result['tool_calls'])} tools") | |
| elif routing_result["status"] == "clarification_needed": | |
| # Need clarification from user | |
| state["clarification_request"] = routing_result["clarification_request"] | |
| state = update_processing_status(state, "needs_clarification", "Clarification needed") | |
| logging.info("❓ Clarification requested from user") | |
| elif routing_result["status"] == "rejected": | |
| # Request rejected with explanation | |
| state["rejection_reason"] = routing_result["rejection_reason"] | |
| state = update_processing_status(state, "rejected", "Request rejected") | |
| logging.info("❌ Request rejected by validator") | |
| else: # error | |
| # Routing error - provide fallback | |
| state = add_error(state, routing_result["error_message"], "llm_routing") | |
| state = update_processing_status(state, "error", "Routing error occurred") | |
| logging.error("💥 Routing error occurred") | |
| # Add performance metrics | |
| routing_time = routing_result.get("routing_time_ms", 0) | |
| state.setdefault("performance_metrics", {})["llm_routing_ms"] = routing_time | |
| logging.info(f"🎯 LLM Router Node completed ({routing_time}ms)") | |
| return state | |
| except Exception as e: | |
| logging.error(f"💥 LLM Router Node failed: {e}") | |
| state = add_error(state, f"LLM router node error: {str(e)}", "llm_routing") | |
| state["agent_response"] = "Mi dispiace, si è verificato un errore tecnico. Puoi riprovare?" | |
| return state | |
| # TODO: Future conversation context integration | |
| async def llm_router_with_context_node(state: AgentState) -> AgentState: | |
| """ | |
| Enhanced LLM router with conversation context integration | |
| This future version will better handle multi-turn conversations | |
| and maintain context across multiple queries in a session. | |
| """ | |
| pass | |
| # TODO: Future emergency priority router | |
| async def emergency_priority_router_node(state: AgentState) -> AgentState: | |
| """ | |
| Emergency-aware router with priority handling | |
| This future version will detect emergency scenarios and | |
| provide fast-track routing with reduced validation delays. | |
| """ | |
| pass | |
| # TODO: Future multi-tool coordination router | |
| async def multi_tool_coordination_router_node(state: AgentState) -> AgentState: | |
| """ | |
| Router capable of coordinating multiple tools | |
| This future version will handle complex scenarios requiring | |
| multiple data sources (OMIRL + bulletins + traffic). | |
| """ | |
| pass | |