Spaces:
Runtime error
Runtime error
| """ | |
| Deterministic Validator for LLM Router Proposals | |
| This module provides fast, deterministic validation of LLM routing proposals | |
| focused on tool selection, task orchestration, and security policies. | |
| Purpose: | |
| - Validate tool availability and task selection | |
| - Orchestrate multi-tool and multi-task workflows | |
| - Enforce security policies and rate limits | |
| - Provide clear rejection reasons for transparency | |
| Architecture: | |
| - ProposalValidator: Main validation orchestrator | |
| - ToolValidator: Tool existence and task validation | |
| - SecurityValidator: Security policy enforcement | |
| - ValidationResult: Structured validation outcome | |
| Design Principles: | |
| - Fast deterministic checks (no LLM calls) | |
| - Focus on routing and orchestration decisions | |
| - Clear pass/fail decisions with explanations | |
| - Fail-safe defaults (reject when in doubt) | |
| - Delegate detailed parameter validation to tools | |
| Future Extensions: | |
| - TODO: Add service health checking before tool execution | |
| - TODO: Add rate limiting per user/session | |
| - TODO: Add emergency priority validation and bypass rules | |
| - TODO: Add multi-tool dependency validation and sequencing | |
| - TODO: Add RAG knowledge base integration for procedure validation | |
| - TODO: Add bulletin alert context for enhanced routing logic | |
| """ | |
| import re | |
| import logging | |
| import yaml | |
| from typing import Dict, Any, List, Optional, Tuple, Set | |
| from dataclasses import dataclass | |
| from datetime import datetime, timedelta | |
| from enum import Enum | |
| from .llm_client import ToolProposal | |
| class ValidationResult(Enum): | |
| """Validation decision outcomes""" | |
| APPROVED = "approved" # Execute the proposal | |
| REJECTED = "rejected" # Reject with explanation | |
| CLARIFICATION = "clarification" # Need more info from user | |
| class ValidationOutcome: | |
| """ | |
| Structured result of proposal validation | |
| This provides clear feedback on validation decisions | |
| with specific reasons and suggested actions. | |
| Attributes: | |
| result: Validation decision (approved/rejected/clarification) | |
| confidence: Confidence in validation decision (0.0-1.0) | |
| reason: Detailed explanation of decision | |
| suggested_action: What should happen next | |
| modified_params: Any parameter modifications made during validation | |
| warnings: Non-fatal issues detected | |
| execution_approved: Whether tool execution should proceed | |
| """ | |
| result: ValidationResult | |
| confidence: float | |
| reason: str | |
| suggested_action: str | |
| modified_params: Optional[Dict[str, Any]] = None | |
| warnings: List[str] = None | |
| execution_approved: bool = False | |
| class ProposalValidator: | |
| """ | |
| Main validator for LLM router proposals | |
| This orchestrates validation focused on tool selection, | |
| task routing, and security policies. | |
| """ | |
| def __init__(self): | |
| """Initialize validator with focused sub-validators""" | |
| self.tool_validator = ToolValidator() | |
| self.security_validator = SecurityValidator() | |
| # TODO: Add future validators for orchestration | |
| # self.service_health_validator = ServiceHealthValidator() | |
| # self.rate_limit_validator = RateLimitValidator() | |
| # self.emergency_priority_validator = EmergencyPriorityValidator() | |
| # self.workflow_orchestrator = WorkflowOrchestrator() | |
| logging.info("🛡️ ProposalValidator initialized (tool/task focus)") | |
| async def validate_proposal( | |
| self, | |
| proposal: ToolProposal, | |
| user_context: Optional[Dict[str, Any]] = None | |
| ) -> ValidationOutcome: | |
| """ | |
| Validate LLM router proposal for tool selection and security | |
| This focuses on routing decisions and delegates detailed | |
| parameter validation to the individual tools. | |
| Args: | |
| proposal: LLM router proposal to validate | |
| user_context: Additional context (user_id, session, etc.) | |
| Returns: | |
| ValidationOutcome with routing decision and reasoning | |
| """ | |
| start_time = datetime.now() | |
| logging.info(f"🔍 Validating routing for: {proposal.tool}/{proposal.task}") | |
| try: | |
| # 1. Tool and task selection validation | |
| tool_result = await self.tool_validator.validate(proposal) | |
| if tool_result.result != ValidationResult.APPROVED: | |
| return tool_result | |
| # 2. Security policy validation | |
| security_result = self.security_validator.validate(proposal, user_context) | |
| if security_result.result != ValidationResult.APPROVED: | |
| return security_result | |
| # TODO: Future orchestration checks | |
| # 3. Multi-tool workflow validation | |
| # workflow_result = self.workflow_orchestrator.validate(proposal, user_context) | |
| # if workflow_result.result != ValidationResult.APPROVED: | |
| # return workflow_result | |
| # 4. Service health validation | |
| # health_result = await self.service_health_validator.validate(proposal) | |
| # if health_result.result != ValidationResult.APPROVED: | |
| # return health_result | |
| # 5. Rate limiting validation | |
| # rate_result = self.rate_limit_validator.validate(proposal, user_context) | |
| # if rate_result.result != ValidationResult.APPROVED: | |
| # return rate_result | |
| # All validations passed | |
| validation_time = (datetime.now() - start_time).total_seconds() | |
| logging.info(f"✅ Routing validated successfully ({validation_time:.3f}s)") | |
| return ValidationOutcome( | |
| result=ValidationResult.APPROVED, | |
| confidence=1.0, | |
| reason="Tool selection and security validation passed", | |
| suggested_action="Execute tool with provided parameters (detailed validation delegated to tool)", | |
| execution_approved=True, | |
| warnings=self._collect_warnings([tool_result, security_result]) | |
| ) | |
| except Exception as e: | |
| logging.error(f"❌ Validation error: {e}") | |
| return ValidationOutcome( | |
| result=ValidationResult.REJECTED, | |
| confidence=1.0, | |
| reason=f"Validation system error: {str(e)}", | |
| suggested_action="Request user to try again later", | |
| execution_approved=False | |
| ) | |
| def _collect_warnings(self, validation_results: List[ValidationOutcome]) -> List[str]: | |
| """Collect warnings from all validation results""" | |
| warnings = [] | |
| for result in validation_results: | |
| if result.warnings: | |
| warnings.extend(result.warnings) | |
| return warnings | |
| class ToolValidator: | |
| """ | |
| Validates tool selection and task routing decisions | |
| This ensures requested tools and tasks exist and are properly | |
| configured, focusing on routing decisions rather than detailed | |
| parameter validation. | |
| """ | |
| def __init__(self): | |
| """Initialize with available tools from configuration""" | |
| self._load_tool_registry() | |
| def _load_tool_registry(self): | |
| """Load tool specifications from configuration""" | |
| try: | |
| import os | |
| # Load tool registry configuration | |
| config_path = os.path.join(os.path.dirname(__file__), 'config', 'tool_registry.yaml') | |
| with open(config_path, 'r') as f: | |
| config = yaml.safe_load(f) | |
| # Build available_tools from registry configuration | |
| self.available_tools = {} | |
| if "tools" in config: | |
| for tool_name, tool_config in config["tools"].items(): | |
| # Extract tasks and their configurations | |
| tasks = tool_config.get("tasks", {}) | |
| self.available_tools[tool_name] = { | |
| "name": tool_config.get("name", tool_name), | |
| "display_name": tool_config.get("display_name", tool_name), | |
| "description": tool_config.get("description", ""), | |
| "enabled": tool_config.get("enabled", True), | |
| "priority": tool_config.get("priority", 0.5), | |
| "capabilities": tool_config.get("capabilities", []), | |
| "geographic_scope": tool_config.get("geographic_scope", "liguria"), | |
| "data_types": tool_config.get("data_types", []), | |
| "tasks": list(tasks.keys()), | |
| "task_configs": tasks | |
| } | |
| logging.info(f"🔧 ToolValidator loaded registry: {list(self.available_tools.keys())}") | |
| except Exception as e: | |
| # Fallback to basic configuration | |
| logging.warning(f"⚠️ Could not load tool registry, using fallback: {e}") | |
| self.available_tools = { | |
| "omirl_tool": { | |
| "name": "omirl_tool", | |
| "display_name": "OMIRL Weather Monitoring", | |
| "description": "Real-time weather monitoring for Liguria region", | |
| "enabled": True, | |
| "priority": 0.8, | |
| "capabilities": ["real_time_weather_data", "station_monitoring"], | |
| "geographic_scope": "liguria", | |
| "data_types": ["real_time", "observational"], | |
| "tasks": ["valori_stazioni", "massimi_precipitazione"], | |
| "task_configs": { | |
| "valori_stazioni": {"enabled": True}, | |
| "massimi_precipitazione": {"enabled": True} | |
| } | |
| } | |
| } | |
| async def validate(self, proposal: ToolProposal) -> ValidationOutcome: | |
| """ | |
| Validate tool selection and task routing | |
| Args: | |
| proposal: Tool proposal to validate | |
| Returns: | |
| ValidationOutcome for routing validation | |
| """ | |
| # Check tool exists and is enabled | |
| if proposal.tool not in self.available_tools: | |
| available_tools = list(self.available_tools.keys()) | |
| return ValidationOutcome( | |
| result=ValidationResult.REJECTED, | |
| confidence=1.0, | |
| reason=f"Tool '{proposal.tool}' not available", | |
| suggested_action=f"Available tools: {', '.join(available_tools)}" | |
| ) | |
| tool_spec = self.available_tools[proposal.tool] | |
| # Check if tool is enabled | |
| if not tool_spec.get("enabled", True): | |
| return ValidationOutcome( | |
| result=ValidationResult.REJECTED, | |
| confidence=1.0, | |
| reason=f"Tool '{proposal.tool}' is currently disabled", | |
| suggested_action="Contact administrator or try alternative tools" | |
| ) | |
| # Check task exists within tool | |
| if proposal.task not in tool_spec["tasks"]: | |
| available_tasks = ", ".join(tool_spec["tasks"]) | |
| return ValidationOutcome( | |
| result=ValidationResult.REJECTED, | |
| confidence=1.0, | |
| reason=f"Task '{proposal.task}' not available for {proposal.tool}", | |
| suggested_action=f"Available tasks for {tool_spec['display_name']}: {available_tasks}" | |
| ) | |
| # Check if specific task is enabled | |
| task_config = tool_spec["task_configs"].get(proposal.task, {}) | |
| if not task_config.get("enabled", True): | |
| return ValidationOutcome( | |
| result=ValidationResult.REJECTED, | |
| confidence=1.0, | |
| reason=f"Task '{proposal.task}' is currently disabled for {proposal.tool}", | |
| suggested_action=f"Try other available tasks: {', '.join([t for t in tool_spec['tasks'] if tool_spec['task_configs'].get(t, {}).get('enabled', True)])}" | |
| ) | |
| # Basic parameter structure validation | |
| if not isinstance(proposal.params, dict): | |
| return ValidationOutcome( | |
| result=ValidationResult.REJECTED, | |
| confidence=1.0, | |
| reason="Parameters must be provided as a dictionary", | |
| suggested_action="Provide parameters in correct format" | |
| ) | |
| # Log routing decision for debugging | |
| logging.info(f"🎯 Routing approved: {proposal.tool}/{proposal.task} (priority: {tool_spec.get('priority', 'unknown')})") | |
| # TODO: Add multi-tool orchestration logic here | |
| # TODO: Add task sequencing validation | |
| # TODO: Add capability matching validation | |
| return ValidationOutcome( | |
| result=ValidationResult.APPROVED, | |
| confidence=1.0, | |
| reason=f"Tool routing validated: {tool_spec['display_name']} -> {proposal.task}", | |
| suggested_action="Proceed with tool execution (parameter validation delegated to tool)", | |
| execution_approved=True | |
| ) | |
| class SecurityValidator: | |
| """ | |
| Validates security policies and access controls | |
| This enforces security policies to prevent abuse | |
| and ensure compliance with operational guidelines. | |
| """ | |
| # TODO: Load from configuration | |
| BLOCKED_PATTERNS = [ | |
| # Potential injection attempts | |
| r"<script.*?>", | |
| r"javascript:", | |
| r"eval\s*\(", | |
| # File system attempts | |
| r"\.\./", | |
| r"/etc/", | |
| r"C:\\", | |
| # Command injection attempts | |
| r";\s*(rm|del|format)", | |
| r"\|\s*(curl|wget)" | |
| ] | |
| def validate( | |
| self, | |
| proposal: ToolProposal, | |
| user_context: Optional[Dict[str, Any]] = None | |
| ) -> ValidationOutcome: | |
| """ | |
| Validate security policies | |
| Args: | |
| proposal: Tool proposal to validate | |
| user_context: User session information | |
| Returns: | |
| ValidationOutcome for security validation | |
| """ | |
| # Check for malicious patterns in parameters | |
| for param_name, param_value in proposal.params.items(): | |
| if isinstance(param_value, str): | |
| for pattern in self.BLOCKED_PATTERNS: | |
| if re.search(pattern, param_value, re.IGNORECASE): | |
| logging.warning(f"🚨 Blocked malicious pattern in {param_name}: {pattern}") | |
| return ValidationOutcome( | |
| result=ValidationResult.REJECTED, | |
| confidence=1.0, | |
| reason=f"Security policy violation in parameter '{param_name}'", | |
| suggested_action="Block request and log security event" | |
| ) | |
| # TODO: Add user authentication validation | |
| # TODO: Add session validation | |
| # TODO: Add rate limiting checks | |
| # TODO: Add emergency bypass validation | |
| return ValidationOutcome( | |
| result=ValidationResult.APPROVED, | |
| confidence=1.0, | |
| reason="Security validation passed", | |
| suggested_action="Proceed with security clearance" | |
| ) | |
| # TODO: Future orchestration capabilities | |
| class WorkflowOrchestrator: | |
| """ | |
| Orchestrates multi-tool and multi-task workflows | |
| This will handle complex scenarios requiring multiple tools | |
| or sequential task execution. | |
| """ | |
| pass | |
| # TODO: Future service health validator | |
| class ServiceHealthValidator: | |
| """ | |
| Validates service availability before tool execution | |
| This checks if external services (OMIRL, bulletins, etc.) | |
| are available before attempting tool execution. | |
| """ | |
| pass | |
| # TODO: Future rate limiting validator | |
| class RateLimitValidator: | |
| """ | |
| Validates rate limits and usage quotas | |
| This prevents abuse by enforcing per-user and per-session | |
| rate limits on tool execution. | |
| """ | |
| pass | |
| # TODO: Future emergency priority validator | |
| class EmergencyPriorityValidator: | |
| """ | |
| Validates emergency scenarios and priority handling | |
| This provides bypass mechanisms for emergency situations | |
| and ensures critical requests are prioritized. | |
| """ | |
| pass | |