""" Deterministic Validator for LLM Router Proposals This module provides fast, deterministic validation of LLM routing proposals focused on tool selection, task orchestration, and security policies. Purpose: - Validate tool availability and task selection - Orchestrate multi-tool and multi-task workflows - Enforce security policies and rate limits - Provide clear rejection reasons for transparency Architecture: - ProposalValidator: Main validation orchestrator - ToolValidator: Tool existence and task validation - SecurityValidator: Security policy enforcement - ValidationResult: Structured validation outcome Design Principles: - Fast deterministic checks (no LLM calls) - Focus on routing and orchestration decisions - Clear pass/fail decisions with explanations - Fail-safe defaults (reject when in doubt) - Delegate detailed parameter validation to tools Future Extensions: - TODO: Add service health checking before tool execution - TODO: Add rate limiting per user/session - TODO: Add emergency priority validation and bypass rules - TODO: Add multi-tool dependency validation and sequencing - TODO: Add RAG knowledge base integration for procedure validation - TODO: Add bulletin alert context for enhanced routing logic """ import re import logging import yaml from typing import Dict, Any, List, Optional, Tuple, Set from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum from .llm_client import ToolProposal class ValidationResult(Enum): """Validation decision outcomes""" APPROVED = "approved" # Execute the proposal REJECTED = "rejected" # Reject with explanation CLARIFICATION = "clarification" # Need more info from user @dataclass class ValidationOutcome: """ Structured result of proposal validation This provides clear feedback on validation decisions with specific reasons and suggested actions. Attributes: result: Validation decision (approved/rejected/clarification) confidence: Confidence in validation decision (0.0-1.0) reason: Detailed explanation of decision suggested_action: What should happen next modified_params: Any parameter modifications made during validation warnings: Non-fatal issues detected execution_approved: Whether tool execution should proceed """ result: ValidationResult confidence: float reason: str suggested_action: str modified_params: Optional[Dict[str, Any]] = None warnings: List[str] = None execution_approved: bool = False class ProposalValidator: """ Main validator for LLM router proposals This orchestrates validation focused on tool selection, task routing, and security policies. """ def __init__(self): """Initialize validator with focused sub-validators""" self.tool_validator = ToolValidator() self.security_validator = SecurityValidator() # TODO: Add future validators for orchestration # self.service_health_validator = ServiceHealthValidator() # self.rate_limit_validator = RateLimitValidator() # self.emergency_priority_validator = EmergencyPriorityValidator() # self.workflow_orchestrator = WorkflowOrchestrator() logging.info("🛡️ ProposalValidator initialized (tool/task focus)") async def validate_proposal( self, proposal: ToolProposal, user_context: Optional[Dict[str, Any]] = None ) -> ValidationOutcome: """ Validate LLM router proposal for tool selection and security This focuses on routing decisions and delegates detailed parameter validation to the individual tools. Args: proposal: LLM router proposal to validate user_context: Additional context (user_id, session, etc.) Returns: ValidationOutcome with routing decision and reasoning """ start_time = datetime.now() logging.info(f"🔍 Validating routing for: {proposal.tool}/{proposal.task}") try: # 1. Tool and task selection validation tool_result = await self.tool_validator.validate(proposal) if tool_result.result != ValidationResult.APPROVED: return tool_result # 2. Security policy validation security_result = self.security_validator.validate(proposal, user_context) if security_result.result != ValidationResult.APPROVED: return security_result # TODO: Future orchestration checks # 3. Multi-tool workflow validation # workflow_result = self.workflow_orchestrator.validate(proposal, user_context) # if workflow_result.result != ValidationResult.APPROVED: # return workflow_result # 4. Service health validation # health_result = await self.service_health_validator.validate(proposal) # if health_result.result != ValidationResult.APPROVED: # return health_result # 5. Rate limiting validation # rate_result = self.rate_limit_validator.validate(proposal, user_context) # if rate_result.result != ValidationResult.APPROVED: # return rate_result # All validations passed validation_time = (datetime.now() - start_time).total_seconds() logging.info(f"✅ Routing validated successfully ({validation_time:.3f}s)") return ValidationOutcome( result=ValidationResult.APPROVED, confidence=1.0, reason="Tool selection and security validation passed", suggested_action="Execute tool with provided parameters (detailed validation delegated to tool)", execution_approved=True, warnings=self._collect_warnings([tool_result, security_result]) ) except Exception as e: logging.error(f"❌ Validation error: {e}") return ValidationOutcome( result=ValidationResult.REJECTED, confidence=1.0, reason=f"Validation system error: {str(e)}", suggested_action="Request user to try again later", execution_approved=False ) def _collect_warnings(self, validation_results: List[ValidationOutcome]) -> List[str]: """Collect warnings from all validation results""" warnings = [] for result in validation_results: if result.warnings: warnings.extend(result.warnings) return warnings class ToolValidator: """ Validates tool selection and task routing decisions This ensures requested tools and tasks exist and are properly configured, focusing on routing decisions rather than detailed parameter validation. """ def __init__(self): """Initialize with available tools from configuration""" self._load_tool_registry() def _load_tool_registry(self): """Load tool specifications from configuration""" try: import os # Load tool registry configuration config_path = os.path.join(os.path.dirname(__file__), 'config', 'tool_registry.yaml') with open(config_path, 'r') as f: config = yaml.safe_load(f) # Build available_tools from registry configuration self.available_tools = {} if "tools" in config: for tool_name, tool_config in config["tools"].items(): # Extract tasks and their configurations tasks = tool_config.get("tasks", {}) self.available_tools[tool_name] = { "name": tool_config.get("name", tool_name), "display_name": tool_config.get("display_name", tool_name), "description": tool_config.get("description", ""), "enabled": tool_config.get("enabled", True), "priority": tool_config.get("priority", 0.5), "capabilities": tool_config.get("capabilities", []), "geographic_scope": tool_config.get("geographic_scope", "liguria"), "data_types": tool_config.get("data_types", []), "tasks": list(tasks.keys()), "task_configs": tasks } logging.info(f"🔧 ToolValidator loaded registry: {list(self.available_tools.keys())}") except Exception as e: # Fallback to basic configuration logging.warning(f"⚠️ Could not load tool registry, using fallback: {e}") self.available_tools = { "omirl_tool": { "name": "omirl_tool", "display_name": "OMIRL Weather Monitoring", "description": "Real-time weather monitoring for Liguria region", "enabled": True, "priority": 0.8, "capabilities": ["real_time_weather_data", "station_monitoring"], "geographic_scope": "liguria", "data_types": ["real_time", "observational"], "tasks": ["valori_stazioni", "massimi_precipitazione"], "task_configs": { "valori_stazioni": {"enabled": True}, "massimi_precipitazione": {"enabled": True} } } } async def validate(self, proposal: ToolProposal) -> ValidationOutcome: """ Validate tool selection and task routing Args: proposal: Tool proposal to validate Returns: ValidationOutcome for routing validation """ # Check tool exists and is enabled if proposal.tool not in self.available_tools: available_tools = list(self.available_tools.keys()) return ValidationOutcome( result=ValidationResult.REJECTED, confidence=1.0, reason=f"Tool '{proposal.tool}' not available", suggested_action=f"Available tools: {', '.join(available_tools)}" ) tool_spec = self.available_tools[proposal.tool] # Check if tool is enabled if not tool_spec.get("enabled", True): return ValidationOutcome( result=ValidationResult.REJECTED, confidence=1.0, reason=f"Tool '{proposal.tool}' is currently disabled", suggested_action="Contact administrator or try alternative tools" ) # Check task exists within tool if proposal.task not in tool_spec["tasks"]: available_tasks = ", ".join(tool_spec["tasks"]) return ValidationOutcome( result=ValidationResult.REJECTED, confidence=1.0, reason=f"Task '{proposal.task}' not available for {proposal.tool}", suggested_action=f"Available tasks for {tool_spec['display_name']}: {available_tasks}" ) # Check if specific task is enabled task_config = tool_spec["task_configs"].get(proposal.task, {}) if not task_config.get("enabled", True): return ValidationOutcome( result=ValidationResult.REJECTED, confidence=1.0, reason=f"Task '{proposal.task}' is currently disabled for {proposal.tool}", suggested_action=f"Try other available tasks: {', '.join([t for t in tool_spec['tasks'] if tool_spec['task_configs'].get(t, {}).get('enabled', True)])}" ) # Basic parameter structure validation if not isinstance(proposal.params, dict): return ValidationOutcome( result=ValidationResult.REJECTED, confidence=1.0, reason="Parameters must be provided as a dictionary", suggested_action="Provide parameters in correct format" ) # Log routing decision for debugging logging.info(f"🎯 Routing approved: {proposal.tool}/{proposal.task} (priority: {tool_spec.get('priority', 'unknown')})") # TODO: Add multi-tool orchestration logic here # TODO: Add task sequencing validation # TODO: Add capability matching validation return ValidationOutcome( result=ValidationResult.APPROVED, confidence=1.0, reason=f"Tool routing validated: {tool_spec['display_name']} -> {proposal.task}", suggested_action="Proceed with tool execution (parameter validation delegated to tool)", execution_approved=True ) class SecurityValidator: """ Validates security policies and access controls This enforces security policies to prevent abuse and ensure compliance with operational guidelines. """ # TODO: Load from configuration BLOCKED_PATTERNS = [ # Potential injection attempts r"", r"javascript:", r"eval\s*\(", # File system attempts r"\.\./", r"/etc/", r"C:\\", # Command injection attempts r";\s*(rm|del|format)", r"\|\s*(curl|wget)" ] def validate( self, proposal: ToolProposal, user_context: Optional[Dict[str, Any]] = None ) -> ValidationOutcome: """ Validate security policies Args: proposal: Tool proposal to validate user_context: User session information Returns: ValidationOutcome for security validation """ # Check for malicious patterns in parameters for param_name, param_value in proposal.params.items(): if isinstance(param_value, str): for pattern in self.BLOCKED_PATTERNS: if re.search(pattern, param_value, re.IGNORECASE): logging.warning(f"🚨 Blocked malicious pattern in {param_name}: {pattern}") return ValidationOutcome( result=ValidationResult.REJECTED, confidence=1.0, reason=f"Security policy violation in parameter '{param_name}'", suggested_action="Block request and log security event" ) # TODO: Add user authentication validation # TODO: Add session validation # TODO: Add rate limiting checks # TODO: Add emergency bypass validation return ValidationOutcome( result=ValidationResult.APPROVED, confidence=1.0, reason="Security validation passed", suggested_action="Proceed with security clearance" ) # TODO: Future orchestration capabilities class WorkflowOrchestrator: """ Orchestrates multi-tool and multi-task workflows This will handle complex scenarios requiring multiple tools or sequential task execution. """ pass # TODO: Future service health validator class ServiceHealthValidator: """ Validates service availability before tool execution This checks if external services (OMIRL, bulletins, etc.) are available before attempting tool execution. """ pass # TODO: Future rate limiting validator class RateLimitValidator: """ Validates rate limits and usage quotas This prevents abuse by enforcing per-user and per-session rate limits on tool execution. """ pass # TODO: Future emergency priority validator class EmergencyPriorityValidator: """ Validates emergency scenarios and priority handling This provides bypass mechanisms for emergency situations and ensures critical requests are prioritized. """ pass