tflux2011's picture
Upload 7 files
b2e0e38 verified
# src/safety/escalation_ladder.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Any, Optional
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RiskLevel(Enum):
LOW = "LOW"
MEDIUM = "MEDIUM"
HIGH = "HIGH"
CRITICAL = "CRITICAL"
class ExecutionStatus(Enum):
SUCCESS = "SUCCESS"
WAITING_FOR_APPROVAL = "WAITING_FOR_APPROVAL"
REJECTED = "REJECTED"
ERROR = "ERROR"
@dataclass
class ExecutionResult:
status: ExecutionStatus
result: Optional[Any] = None
error_message: Optional[str] = None
decision_id: Optional[str] = None
class RiskPolicy(ABC):
@abstractmethod
def get_level(self, tool_name: str) -> RiskLevel:
pass
class Tool(ABC):
@abstractmethod
def run(self, parameters: dict) -> Any:
pass
class ApprovalService(ABC):
@abstractmethod
def create_approval_request(self, tool_name: str, parameters: dict) -> str:
pass
@abstractmethod
def notify_manager(self, decision_id: str) -> None:
pass
class EscalationLadder:
def __init__(
self,
risk_policy: RiskPolicy,
tool: Tool,
approval_service: ApprovalService
):
self._risk_policy = risk_policy
self._tool = tool
self._approval_service = approval_service
def execute_tool(self, tool_name: str, parameters: dict) -> ExecutionResult:
"""Execute a tool with appropriate risk-based escalation."""
if not tool_name or not isinstance(tool_name, str):
logger.error("Invalid tool_name provided")
return ExecutionResult(
status=ExecutionStatus.ERROR,
error_message="Invalid tool_name: must be a non-empty string"
)
if not isinstance(parameters, dict):
logger.error("Invalid parameters provided")
return ExecutionResult(
status=ExecutionStatus.ERROR,
error_message="Invalid parameters: must be a dictionary"
)
try:
risk_level = self._risk_policy.get_level(tool_name)
except Exception as e:
logger.exception("Failed to assess risk level")
return ExecutionResult(
status=ExecutionStatus.ERROR,
error_message=f"Risk assessment failed: {str(e)}"
)
if risk_level == RiskLevel.CRITICAL:
logger.warning(f"CRITICAL risk tool '{tool_name}' blocked")
return ExecutionResult(
status=ExecutionStatus.REJECTED,
error_message="Critical risk tools are not permitted"
)
if risk_level == RiskLevel.HIGH:
return self._handle_high_risk(tool_name, parameters)
if risk_level == RiskLevel.MEDIUM:
logger.info(f"MEDIUM risk tool '{tool_name}' - logging for audit")
return self._execute_with_logging(tool_name, parameters)
# LOW risk - execute immediately
return self._execute_tool(parameters)
def _handle_high_risk(self, tool_name: str, parameters: dict) -> ExecutionResult:
"""Handle high-risk tool execution with approval workflow."""
try:
decision_id = self._approval_service.create_approval_request(
tool_name, parameters
)
self._approval_service.notify_manager(decision_id)
logger.info(f"Approval request created: {decision_id}")
return ExecutionResult(
status=ExecutionStatus.WAITING_FOR_APPROVAL,
decision_id=decision_id
)
except Exception as e:
logger.exception("Failed to create approval request")
return ExecutionResult(
status=ExecutionStatus.ERROR,
error_message=f"Approval workflow failed: {str(e)}"
)
def _execute_with_logging(self, tool_name: str, parameters: dict) -> ExecutionResult:
"""Execute tool with enhanced audit logging."""
logger.info(f"Executing medium-risk tool: {tool_name}")
return self._execute_tool(parameters)
def _execute_tool(self, parameters: dict) -> ExecutionResult:
"""Execute the tool and return result."""
try:
result = self._tool.run(parameters)
return ExecutionResult(status=ExecutionStatus.SUCCESS, result=result)
except Exception as e:
logger.exception("Tool execution failed")
return ExecutionResult(
status=ExecutionStatus.ERROR,
error_message=f"Execution failed: {str(e)}"
)