operations / agent /llm_router_node.py
jbbove's picture
Configured Ollaman and improved llm config functionality: now the default is ollama, the fallback is gemini 2.5 flash lite, and these can be changed modifying just on line in the yaml config file
c46d4a9
"""
LLM Router Node for LangGraph Workflow
This module provides the main LLM router node with a single, intelligent routing step.
Purpose:
- Replace intent_parsing_node + tool_planning_node with single LLM call
- Integrate LLM client and validator for intelligent routing
- Maintain compatibility with existing AgentState interface
- Provide clear error handling and fallback mechanisms
Architecture:
- llm_router_node: Main LangGraph node function
- RouterOrchestrator: Coordinates LLM client and validator
- Error handling: Graceful degradation when LLM or validation fails
- State management: Clean updates to AgentState structure
Design Principles:
- Single responsibility: Route query to appropriate tool
- Fail-safe operation: Always return valid state, never crash
- Transparent decisions: Log all routing decisions clearly
- Performance oriented: Fast validation, reasonable LLM timeouts
Integration Points:
- Uses LLMClient for intelligent query analysis
- Uses ProposalValidator for safety validation
- Updates AgentState for downstream tool execution
- Integrates with existing tool registry system
Future Extensions:
- TODO: Add conversation context integration for multi-turn dialogues
- TODO: Add multi-tool coordination for complex scenarios
- TODO: Add RAG integration for emergency procedure queries
- TODO: Add performance monitoring and LLM routing analytics
- TODO: Add A/B testing support for prompt optimization
"""
import asyncio
import logging
from typing import Dict, Any, List, Optional
import yaml
import os
from pathlib import Path
from datetime import datetime
from .state import AgentState, ToolCall, update_processing_status, add_error
from .llm_client import LLMClient, ToolProposal, LLMRoutingError
from .validator import ProposalValidator, ValidationResult, ValidationOutcome
from .registry import get_tool_registry
# Load LLM router config
def _load_llm_router_config():
config_path = Path(__file__).parent / "config" / "llm_router_config.yaml"
try:
with open(config_path, "r") as f:
return yaml.safe_load(f)
except Exception as e:
logging.warning(f"Could not load llm_router_config.yaml: {e}")
return {}
LLM_ROUTER_CONFIG = _load_llm_router_config()
class RouterOrchestrator:
"""
Orchestrates LLM routing with validation
This class coordinates the LLM client and proposal validator to provide
safe, validated tool routing decisions. It serves as the main interface
between the agent workflow and the LLM routing system.
"""
def __init__(
self,
llm_provider: Optional[str] = None,
llm_model: Optional[str] = None,
llm_temperature: float = 0.1,
llm_timeout: int = 10,
fallback_provider: Optional[str] = None,
fallback_model: Optional[str] = None
):
"""Initialize router with configurable LLM client and validator with fallback"""
# Load config if parameters not provided
if llm_provider is None or llm_model is None:
llm_config = LLM_ROUTER_CONFIG.get("llm", {})
llm_provider = llm_provider or llm_config.get("provider")
llm_model = llm_model or llm_config.get("model")
fallback_config = llm_config.get("fallback", {})
fallback_provider = fallback_provider or fallback_config.get("provider")
fallback_model = fallback_model or fallback_config.get("model")
self.llm_client = LLMClient(
provider=llm_provider,
model=llm_model,
temperature=llm_temperature,
timeout=llm_timeout,
fallback_provider=fallback_provider,
fallback_model=fallback_model
)
self.validator = ProposalValidator()
self.tool_registry = get_tool_registry()
# Load configuration files
self._load_configurations()
logging.info(f"🎯 RouterOrchestrator initialized with {llm_provider}/{llm_model}")
def _load_configurations(self):
"""Load global configurations and tool-specific configs"""
try:
# Load global geography configuration
config_dir = Path(__file__).parent / "config"
geography_path = config_dir / "geography.yaml"
if geography_path.exists():
with open(geography_path, 'r', encoding='utf-8') as f:
self.geography_config = yaml.safe_load(f)
logging.info("📍 Loaded geography configuration")
else:
self.geography_config = None
logging.warning("📍 Geography configuration not found")
# Load tool registry configuration
tool_registry_path = config_dir / "tool_registry.yaml"
if tool_registry_path.exists():
with open(tool_registry_path, 'r', encoding='utf-8') as f:
self.tool_registry_config = yaml.safe_load(f)
logging.info("🔧 Loaded tool registry configuration")
else:
self.tool_registry_config = None
logging.warning("🔧 Tool registry configuration not found")
# Load OMIRL sensor types
self._load_omirl_config()
except Exception as e:
logging.error(f"❌ Error loading configurations: {e}")
self.geography_config = None
self.tool_registry_config = None
self.omirl_config = None
def _load_omirl_config(self):
"""Load OMIRL-specific configuration"""
try:
omirl_config_path = Path(__file__).parent.parent / "tools" / "omirl" / "config" / "parameters.yaml"
if omirl_config_path.exists():
with open(omirl_config_path, 'r', encoding='utf-8') as f:
self.omirl_config = yaml.safe_load(f)
logging.info("🌦️ Loaded OMIRL configuration")
else:
self.omirl_config = None
logging.warning("🌦️ OMIRL configuration not found")
except Exception as e:
logging.error(f"❌ Error loading OMIRL configuration: {e}")
self.omirl_config = None
async def route_query(
self,
user_query: str,
conversation_context: Optional[List[Dict[str, str]]] = None,
user_context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Route user query to appropriate tool with full validation
This is the main entry point for LLM-based routing. It coordinates
the LLM client and validator to produce safe, executable tool calls.
Args:
user_query: User input in Italian or English
conversation_context: Previous conversation history
user_context: User session and metadata
Returns:
Routing result with tool calls or clarification requests
"""
start_time = datetime.now()
logging.info(f"🚀 Routing query: {user_query[:100]}...")
try:
# 1. Get available tools for LLM
available_tools = self._get_available_tools_for_llm()
# 2. Get LLM routing proposal
logging.info("🤖 Requesting LLM routing proposal...")
proposal = await self.llm_client.route_query(
user_query=user_query,
available_tools=available_tools,
conversation_context=conversation_context
)
# Track which LLM was used
llm_usage = self.llm_client.get_llm_usage_info()
logging.info(f"📝 LLM proposal: {proposal.tool}/{proposal.task} (confidence: {proposal.confidence})")
logging.info(f"🔧 LLM used: {llm_usage['provider']}/{llm_usage['model']}")
# 3. Validate proposal
logging.info("🛡️ Validating LLM proposal...")
validation_outcome = await self.validator.validate_proposal(
proposal=proposal,
user_context=user_context
)
# 4. Process validation result
routing_time = (datetime.now() - start_time).total_seconds()
if validation_outcome.execution_approved:
logging.info(f"✅ Routing approved ({routing_time:.3f}s): {proposal.tool}")
return self._create_approved_result(proposal, validation_outcome, routing_time, llm_usage)
elif validation_outcome.result == ValidationResult.CLARIFICATION:
logging.info(f"❓ Clarification needed ({routing_time:.3f}s): {validation_outcome.reason}")
return self._create_clarification_result(proposal, validation_outcome, routing_time, llm_usage)
else: # REJECTED
logging.warning(f"❌ Routing rejected ({routing_time:.3f}s): {validation_outcome.reason}")
return self._create_rejection_result(proposal, validation_outcome, routing_time, llm_usage)
except LLMRoutingError as e:
logging.error(f"🚨 LLM routing failed: {e}")
return self._create_error_result(user_query, f"LLM routing error: {str(e)}")
except Exception as e:
logging.error(f"💥 Router orchestrator failed: {e}")
return self._create_error_result(user_query, f"Internal routing error: {str(e)}")
def _get_available_tools_for_llm(self) -> List[Dict[str, Any]]:
"""Get tool specifications formatted for LLM prompt with configuration data"""
tools = []
# Build tools based on tool registry configuration
if self.tool_registry_config and "tools" in self.tool_registry_config:
for tool_name, tool_config in self.tool_registry_config["tools"].items():
# Skip disabled tools
if not tool_config.get("enabled", True):
continue
# Build formatted tool with configuration data
formatted_tool = {
"name": tool_name,
"display_name": tool_config.get("display_name", tool_name),
"description": tool_config.get("description", ""),
"enabled": tool_config.get("enabled", True),
"tasks": tool_config.get("tasks", {}),
}
# Add geography data for all tools
if self.geography_config:
# Extract Liguria region data for compatibility with prompt functions
liguria_data = self.geography_config.get("regions", {}).get("liguria", {})
formatted_tool["geography_data"] = liguria_data
# Add sensor types for OMIRL tool
if tool_name == "omirl_tool" and self.omirl_config:
sensor_types = self.omirl_config.get("sensor_types", [])
formatted_tool["sensor_types"] = sensor_types
tools.append(formatted_tool)
else:
# Fallback to registry tools if no configuration
registry_tools = self.tool_registry.get_tool_specs_for_llm()
for tool_spec in registry_tools:
formatted_tool = {
"name": tool_spec["name"],
"description": tool_spec["description"],
"parameters": tool_spec["parameters"],
"geographic_scope": "Liguria (Genova, Savona, Imperia, La Spezia)"
}
# Add geography data if available
if self.geography_config:
# Extract Liguria region data for compatibility with prompt functions
liguria_data = self.geography_config.get("regions", {}).get("liguria", {})
formatted_tool["geography_data"] = liguria_data
tools.append(formatted_tool)
return tools
def _create_approved_result(
self,
proposal: ToolProposal,
validation: ValidationOutcome,
routing_time: float,
llm_usage: Dict[str, Any]
) -> Dict[str, Any]:
"""Create result for approved routing"""
# Use modified parameters if validator made changes
final_params = validation.modified_params or proposal.params
tool_call = ToolCall(
tool_name=proposal.tool,
task=proposal.task,
parameters=final_params,
reason=proposal.reasoning
)
return {
"status": "approved",
"tool_calls": [tool_call],
"routing_confidence": proposal.confidence,
"validation_confidence": validation.confidence,
"routing_time_ms": int(routing_time * 1000),
"llm_reasoning": proposal.reasoning,
"validation_reason": validation.reason,
"warnings": validation.warnings or [],
"execution_ready": True,
"llm_usage": llm_usage # Add LLM usage tracking
}
def _create_clarification_result(
self,
proposal: ToolProposal,
validation: ValidationOutcome,
routing_time: float,
llm_usage: Dict[str, Any]
) -> Dict[str, Any]:
"""Create result for clarification request"""
# Use LLM's clarification if available, otherwise validator's
clarification_text = (
proposal.clarification_request or
getattr(validation, 'suggested_clarification', None) or
validation.reason
)
return {
"status": "clarification_needed",
"clarification_request": clarification_text,
"routing_confidence": proposal.confidence,
"validation_confidence": validation.confidence,
"routing_time_ms": int(routing_time * 1000),
"llm_reasoning": proposal.reasoning,
"validation_reason": validation.reason,
"suggested_action": validation.suggested_action,
"execution_ready": False,
"llm_usage": llm_usage
}
def _create_rejection_result(
self,
proposal: ToolProposal,
validation: ValidationOutcome,
routing_time: float,
llm_usage: Dict[str, Any]
) -> Dict[str, Any]:
"""Create result for rejected routing"""
return {
"status": "rejected",
"rejection_reason": validation.reason,
"routing_confidence": proposal.confidence,
"validation_confidence": validation.confidence,
"routing_time_ms": int(routing_time * 1000),
"llm_reasoning": proposal.reasoning,
"validation_reason": validation.reason,
"suggested_action": validation.suggested_action,
"execution_ready": False,
"llm_usage": llm_usage
}
def _create_error_result(self, user_query: str, error_message: str) -> Dict[str, Any]:
"""Create result for routing errors"""
return {
"status": "error",
"error_message": error_message,
"original_query": user_query,
"routing_confidence": 0.0,
"validation_confidence": 0.0,
"fallback_message": "Mi dispiace, si è verificato un errore nel processare la tua richiesta. Puoi riprovare?",
"execution_ready": False
}
# Global router instance for node function
_router = None
def reset_router():
"""Reset global router instance (useful for testing)"""
global _router
_router = None
def get_router() -> RouterOrchestrator:
"""Get or create global router instance - config loaded from llm_router_config.yaml"""
global _router
if _router is None:
# RouterOrchestrator will load config from yaml automatically
_router = RouterOrchestrator()
logging.info("🚀 Router initialized from config")
return _router
async def llm_router_node(state: AgentState) -> AgentState:
"""
LLM Router Node for LangGraph Workflow
This node replaces the complex intent_parsing_node + tool_planning_node
with a single intelligent routing step using LLM + deterministic validation.
Process:
1. Extract user query from state
2. Route query through LLM client
3. Validate routing proposal
4. Update state with results
Args:
state: Current agent state with user_message
Returns:
Updated state with routing results
State Updates:
- routing_result: Complete routing decision and metadata
- planned_tools: Tool calls if routing approved
- agent_response: Clarification/rejection message if needed
- errors: Any routing errors encountered
"""
logging.info("🎯 LLM Router Node: Starting intelligent routing...")
# Update processing status
state = update_processing_status(state, "llm_routing", "Analyzing query with LLM router")
try:
# Extract user query
user_query = state.get("user_message", "").strip()
if not user_query:
logging.warning("⚠️ Empty user query received")
state = add_error(state, "Empty user query", "llm_routing")
state["agent_response"] = "Mi dispiace, non ho ricevuto nessuna richiesta. Puoi ripetere?"
return state
# Get conversation context if available
conversation_context = state.get("conversation_history", [])
# TODO: Extract user context from state
user_context = {
"session_id": state.get("session_id"),
"user_id": state.get("user_id"),
"timestamp": datetime.now().isoformat()
}
# Route query
router = get_router()
routing_result = await router.route_query(
user_query=user_query,
conversation_context=conversation_context,
user_context=user_context
)
# Update state based on routing result
state["routing_result"] = routing_result
# Track LLM usage if available
if "llm_usage" in routing_result:
if "llm_usage" not in state:
state["llm_usage"] = {}
state["llm_usage"]["routing"] = routing_result["llm_usage"]
if routing_result["status"] == "approved":
# Ready for tool execution
state["planned_tools"] = routing_result["tool_calls"]
state = update_processing_status(state, "approved", "Tools planned for execution")
logging.info(f"✅ Tool execution planned: {len(routing_result['tool_calls'])} tools")
elif routing_result["status"] == "clarification_needed":
# Need clarification from user
state["clarification_request"] = routing_result["clarification_request"]
state = update_processing_status(state, "needs_clarification", "Clarification needed")
logging.info("❓ Clarification requested from user")
elif routing_result["status"] == "rejected":
# Request rejected with explanation
state["rejection_reason"] = routing_result["rejection_reason"]
state = update_processing_status(state, "rejected", "Request rejected")
logging.info("❌ Request rejected by validator")
else: # error
# Routing error - provide fallback
state = add_error(state, routing_result["error_message"], "llm_routing")
state = update_processing_status(state, "error", "Routing error occurred")
logging.error("💥 Routing error occurred")
# Add performance metrics
routing_time = routing_result.get("routing_time_ms", 0)
state.setdefault("performance_metrics", {})["llm_routing_ms"] = routing_time
logging.info(f"🎯 LLM Router Node completed ({routing_time}ms)")
return state
except Exception as e:
logging.error(f"💥 LLM Router Node failed: {e}")
state = add_error(state, f"LLM router node error: {str(e)}", "llm_routing")
state["agent_response"] = "Mi dispiace, si è verificato un errore tecnico. Puoi riprovare?"
return state
# TODO: Future conversation context integration
async def llm_router_with_context_node(state: AgentState) -> AgentState:
"""
Enhanced LLM router with conversation context integration
This future version will better handle multi-turn conversations
and maintain context across multiple queries in a session.
"""
pass
# TODO: Future emergency priority router
async def emergency_priority_router_node(state: AgentState) -> AgentState:
"""
Emergency-aware router with priority handling
This future version will detect emergency scenarios and
provide fast-track routing with reduced validation delays.
"""
pass
# TODO: Future multi-tool coordination router
async def multi_tool_coordination_router_node(state: AgentState) -> AgentState:
"""
Router capable of coordinating multiple tools
This future version will handle complex scenarios requiring
multiple data sources (OMIRL + bulletins + traffic).
"""
pass