operations / agent /validator.py
jbbove's picture
feat: Complete agent architecture refactoring and testing framework
fbd9fda
"""
Deterministic Validator for LLM Router Proposals
This module provides fast, deterministic validation of LLM routing proposals
focused on tool selection, task orchestration, and security policies.
Purpose:
- Validate tool availability and task selection
- Orchestrate multi-tool and multi-task workflows
- Enforce security policies and rate limits
- Provide clear rejection reasons for transparency
Architecture:
- ProposalValidator: Main validation orchestrator
- ToolValidator: Tool existence and task validation
- SecurityValidator: Security policy enforcement
- ValidationResult: Structured validation outcome
Design Principles:
- Fast deterministic checks (no LLM calls)
- Focus on routing and orchestration decisions
- Clear pass/fail decisions with explanations
- Fail-safe defaults (reject when in doubt)
- Delegate detailed parameter validation to tools
Future Extensions:
- TODO: Add service health checking before tool execution
- TODO: Add rate limiting per user/session
- TODO: Add emergency priority validation and bypass rules
- TODO: Add multi-tool dependency validation and sequencing
- TODO: Add RAG knowledge base integration for procedure validation
- TODO: Add bulletin alert context for enhanced routing logic
"""
import re
import logging
import yaml
from typing import Dict, Any, List, Optional, Tuple, Set
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from .llm_client import ToolProposal
class ValidationResult(Enum):
"""Validation decision outcomes"""
APPROVED = "approved" # Execute the proposal
REJECTED = "rejected" # Reject with explanation
CLARIFICATION = "clarification" # Need more info from user
@dataclass
class ValidationOutcome:
"""
Structured result of proposal validation
This provides clear feedback on validation decisions
with specific reasons and suggested actions.
Attributes:
result: Validation decision (approved/rejected/clarification)
confidence: Confidence in validation decision (0.0-1.0)
reason: Detailed explanation of decision
suggested_action: What should happen next
modified_params: Any parameter modifications made during validation
warnings: Non-fatal issues detected
execution_approved: Whether tool execution should proceed
"""
result: ValidationResult
confidence: float
reason: str
suggested_action: str
modified_params: Optional[Dict[str, Any]] = None
warnings: List[str] = None
execution_approved: bool = False
class ProposalValidator:
"""
Main validator for LLM router proposals
This orchestrates validation focused on tool selection,
task routing, and security policies.
"""
def __init__(self):
"""Initialize validator with focused sub-validators"""
self.tool_validator = ToolValidator()
self.security_validator = SecurityValidator()
# TODO: Add future validators for orchestration
# self.service_health_validator = ServiceHealthValidator()
# self.rate_limit_validator = RateLimitValidator()
# self.emergency_priority_validator = EmergencyPriorityValidator()
# self.workflow_orchestrator = WorkflowOrchestrator()
logging.info("🛡️ ProposalValidator initialized (tool/task focus)")
async def validate_proposal(
self,
proposal: ToolProposal,
user_context: Optional[Dict[str, Any]] = None
) -> ValidationOutcome:
"""
Validate LLM router proposal for tool selection and security
This focuses on routing decisions and delegates detailed
parameter validation to the individual tools.
Args:
proposal: LLM router proposal to validate
user_context: Additional context (user_id, session, etc.)
Returns:
ValidationOutcome with routing decision and reasoning
"""
start_time = datetime.now()
logging.info(f"🔍 Validating routing for: {proposal.tool}/{proposal.task}")
try:
# 1. Tool and task selection validation
tool_result = await self.tool_validator.validate(proposal)
if tool_result.result != ValidationResult.APPROVED:
return tool_result
# 2. Security policy validation
security_result = self.security_validator.validate(proposal, user_context)
if security_result.result != ValidationResult.APPROVED:
return security_result
# TODO: Future orchestration checks
# 3. Multi-tool workflow validation
# workflow_result = self.workflow_orchestrator.validate(proposal, user_context)
# if workflow_result.result != ValidationResult.APPROVED:
# return workflow_result
# 4. Service health validation
# health_result = await self.service_health_validator.validate(proposal)
# if health_result.result != ValidationResult.APPROVED:
# return health_result
# 5. Rate limiting validation
# rate_result = self.rate_limit_validator.validate(proposal, user_context)
# if rate_result.result != ValidationResult.APPROVED:
# return rate_result
# All validations passed
validation_time = (datetime.now() - start_time).total_seconds()
logging.info(f"✅ Routing validated successfully ({validation_time:.3f}s)")
return ValidationOutcome(
result=ValidationResult.APPROVED,
confidence=1.0,
reason="Tool selection and security validation passed",
suggested_action="Execute tool with provided parameters (detailed validation delegated to tool)",
execution_approved=True,
warnings=self._collect_warnings([tool_result, security_result])
)
except Exception as e:
logging.error(f"❌ Validation error: {e}")
return ValidationOutcome(
result=ValidationResult.REJECTED,
confidence=1.0,
reason=f"Validation system error: {str(e)}",
suggested_action="Request user to try again later",
execution_approved=False
)
def _collect_warnings(self, validation_results: List[ValidationOutcome]) -> List[str]:
"""Collect warnings from all validation results"""
warnings = []
for result in validation_results:
if result.warnings:
warnings.extend(result.warnings)
return warnings
class ToolValidator:
"""
Validates tool selection and task routing decisions
This ensures requested tools and tasks exist and are properly
configured, focusing on routing decisions rather than detailed
parameter validation.
"""
def __init__(self):
"""Initialize with available tools from configuration"""
self._load_tool_registry()
def _load_tool_registry(self):
"""Load tool specifications from configuration"""
try:
import os
# Load tool registry configuration
config_path = os.path.join(os.path.dirname(__file__), 'config', 'tool_registry.yaml')
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
# Build available_tools from registry configuration
self.available_tools = {}
if "tools" in config:
for tool_name, tool_config in config["tools"].items():
# Extract tasks and their configurations
tasks = tool_config.get("tasks", {})
self.available_tools[tool_name] = {
"name": tool_config.get("name", tool_name),
"display_name": tool_config.get("display_name", tool_name),
"description": tool_config.get("description", ""),
"enabled": tool_config.get("enabled", True),
"priority": tool_config.get("priority", 0.5),
"capabilities": tool_config.get("capabilities", []),
"geographic_scope": tool_config.get("geographic_scope", "liguria"),
"data_types": tool_config.get("data_types", []),
"tasks": list(tasks.keys()),
"task_configs": tasks
}
logging.info(f"🔧 ToolValidator loaded registry: {list(self.available_tools.keys())}")
except Exception as e:
# Fallback to basic configuration
logging.warning(f"⚠️ Could not load tool registry, using fallback: {e}")
self.available_tools = {
"omirl_tool": {
"name": "omirl_tool",
"display_name": "OMIRL Weather Monitoring",
"description": "Real-time weather monitoring for Liguria region",
"enabled": True,
"priority": 0.8,
"capabilities": ["real_time_weather_data", "station_monitoring"],
"geographic_scope": "liguria",
"data_types": ["real_time", "observational"],
"tasks": ["valori_stazioni", "massimi_precipitazione"],
"task_configs": {
"valori_stazioni": {"enabled": True},
"massimi_precipitazione": {"enabled": True}
}
}
}
async def validate(self, proposal: ToolProposal) -> ValidationOutcome:
"""
Validate tool selection and task routing
Args:
proposal: Tool proposal to validate
Returns:
ValidationOutcome for routing validation
"""
# Check tool exists and is enabled
if proposal.tool not in self.available_tools:
available_tools = list(self.available_tools.keys())
return ValidationOutcome(
result=ValidationResult.REJECTED,
confidence=1.0,
reason=f"Tool '{proposal.tool}' not available",
suggested_action=f"Available tools: {', '.join(available_tools)}"
)
tool_spec = self.available_tools[proposal.tool]
# Check if tool is enabled
if not tool_spec.get("enabled", True):
return ValidationOutcome(
result=ValidationResult.REJECTED,
confidence=1.0,
reason=f"Tool '{proposal.tool}' is currently disabled",
suggested_action="Contact administrator or try alternative tools"
)
# Check task exists within tool
if proposal.task not in tool_spec["tasks"]:
available_tasks = ", ".join(tool_spec["tasks"])
return ValidationOutcome(
result=ValidationResult.REJECTED,
confidence=1.0,
reason=f"Task '{proposal.task}' not available for {proposal.tool}",
suggested_action=f"Available tasks for {tool_spec['display_name']}: {available_tasks}"
)
# Check if specific task is enabled
task_config = tool_spec["task_configs"].get(proposal.task, {})
if not task_config.get("enabled", True):
return ValidationOutcome(
result=ValidationResult.REJECTED,
confidence=1.0,
reason=f"Task '{proposal.task}' is currently disabled for {proposal.tool}",
suggested_action=f"Try other available tasks: {', '.join([t for t in tool_spec['tasks'] if tool_spec['task_configs'].get(t, {}).get('enabled', True)])}"
)
# Basic parameter structure validation
if not isinstance(proposal.params, dict):
return ValidationOutcome(
result=ValidationResult.REJECTED,
confidence=1.0,
reason="Parameters must be provided as a dictionary",
suggested_action="Provide parameters in correct format"
)
# Log routing decision for debugging
logging.info(f"🎯 Routing approved: {proposal.tool}/{proposal.task} (priority: {tool_spec.get('priority', 'unknown')})")
# TODO: Add multi-tool orchestration logic here
# TODO: Add task sequencing validation
# TODO: Add capability matching validation
return ValidationOutcome(
result=ValidationResult.APPROVED,
confidence=1.0,
reason=f"Tool routing validated: {tool_spec['display_name']} -> {proposal.task}",
suggested_action="Proceed with tool execution (parameter validation delegated to tool)",
execution_approved=True
)
class SecurityValidator:
"""
Validates security policies and access controls
This enforces security policies to prevent abuse
and ensure compliance with operational guidelines.
"""
# TODO: Load from configuration
BLOCKED_PATTERNS = [
# Potential injection attempts
r"<script.*?>",
r"javascript:",
r"eval\s*\(",
# File system attempts
r"\.\./",
r"/etc/",
r"C:\\",
# Command injection attempts
r";\s*(rm|del|format)",
r"\|\s*(curl|wget)"
]
def validate(
self,
proposal: ToolProposal,
user_context: Optional[Dict[str, Any]] = None
) -> ValidationOutcome:
"""
Validate security policies
Args:
proposal: Tool proposal to validate
user_context: User session information
Returns:
ValidationOutcome for security validation
"""
# Check for malicious patterns in parameters
for param_name, param_value in proposal.params.items():
if isinstance(param_value, str):
for pattern in self.BLOCKED_PATTERNS:
if re.search(pattern, param_value, re.IGNORECASE):
logging.warning(f"🚨 Blocked malicious pattern in {param_name}: {pattern}")
return ValidationOutcome(
result=ValidationResult.REJECTED,
confidence=1.0,
reason=f"Security policy violation in parameter '{param_name}'",
suggested_action="Block request and log security event"
)
# TODO: Add user authentication validation
# TODO: Add session validation
# TODO: Add rate limiting checks
# TODO: Add emergency bypass validation
return ValidationOutcome(
result=ValidationResult.APPROVED,
confidence=1.0,
reason="Security validation passed",
suggested_action="Proceed with security clearance"
)
# TODO: Future orchestration capabilities
class WorkflowOrchestrator:
"""
Orchestrates multi-tool and multi-task workflows
This will handle complex scenarios requiring multiple tools
or sequential task execution.
"""
pass
# TODO: Future service health validator
class ServiceHealthValidator:
"""
Validates service availability before tool execution
This checks if external services (OMIRL, bulletins, etc.)
are available before attempting tool execution.
"""
pass
# TODO: Future rate limiting validator
class RateLimitValidator:
"""
Validates rate limits and usage quotas
This prevents abuse by enforcing per-user and per-session
rate limits on tool execution.
"""
pass
# TODO: Future emergency priority validator
class EmergencyPriorityValidator:
"""
Validates emergency scenarios and priority handling
This provides bypass mechanisms for emergency situations
and ensures critical requests are prioritized.
"""
pass