occ-stack / broker /broker.py
narcolepticchicken's picture
Upload broker/broker.py
0da095b verified
"""
Resource Broker - grants capability-based rights based on credits, task state, and risk.
"""
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List, Optional
class Decision(Enum):
ALLOW = "allow"
DENY = "deny"
REQUIRE_APPROVAL = "require_approval"
DOWNGRADE = "downgrade"
ESCALATE = "escalate"
ASK_JUSTIFICATION = "ask_justification"
@dataclass
class ResourceDecision:
decision: Decision
reason: str
capability: str
downgrade_to: Optional[str] = None
class ResourceBroker:
"""
Capability-based access control for agent resources.
Risk classes: low (retrieval), medium (model calls), high (file writes, shell).
"""
RESOURCE_RISK = {
"model_call": "medium",
"retrieval_call": "low",
"verifier_call": "medium",
"debate_turn": "low",
"file_write": "high",
"shell_execute": "high",
"memory_write": "medium",
"human_escalation": "high",
"larger_model": "medium",
}
DEFAULT_THRESHOLDS = {
"low": 0.5,
"medium": 2.0,
"high": 5.0,
}
def __init__(
self,
thresholds: Optional[Dict[str, float]] = None,
urgency_boost: float = 0.5,
):
self.thresholds = thresholds or self.DEFAULT_THRESHOLDS.copy()
self.urgency_boost = urgency_boost
self.denial_history: Dict[str, int] = {}
self.approval_history: Dict[str, int] = {}
def request(
self,
capability: str,
agent_id: str,
credit_balance: float,
task_state: Optional[Dict[str, Any]] = None,
risk_score: float = 0.0,
gaming_flags: Optional[List[str]] = None,
) -> ResourceDecision:
task_state = task_state or {}
gaming_flags = gaming_flags or []
risk_class = self.RESOURCE_RISK.get(capability, "medium")
threshold = self.thresholds.get(risk_class, 2.0)
# Adjust threshold based on urgency
urgency = task_state.get("urgency", 0.0)
adjusted_threshold = max(0.1, threshold - urgency * self.urgency_boost)
# Gaming detection overrides
if gaming_flags:
return ResourceDecision(
decision=Decision.DENY,
reason=f"Gaming detected: {gaming_flags}",
capability=capability,
)
# High-risk resources with high risk score
if risk_class == "high" and risk_score > 0.7:
return ResourceDecision(
decision=Decision.REQUIRE_APPROVAL,
reason=f"High risk score {risk_score:.2f} for {capability}",
capability=capability,
)
# Credit check
if credit_balance >= adjusted_threshold:
return ResourceDecision(
decision=Decision.ALLOW,
reason=f"Balance {credit_balance:.2f} >= threshold {adjusted_threshold:.2f}",
capability=capability,
)
# Near-threshold: downgrade or ask for justification
if credit_balance >= adjusted_threshold * 0.5:
if risk_class == "medium":
return ResourceDecision(
decision=Decision.DOWNGRADE,
reason=f"Balance {credit_balance:.2f} below threshold, downgrading",
capability=capability,
downgrade_to="retrieval_call" if capability != "retrieval_call" else None,
)
return ResourceDecision(
decision=Decision.ASK_JUSTIFICATION,
reason=f"Balance {credit_balance:.2f} insufficient, justification required",
capability=capability,
)
# Escalation if repeated denials
denials = self.denial_history.get(agent_id, 0)
if denials > 3:
return ResourceDecision(
decision=Decision.ESCALATE,
reason=f"Agent {agent_id} denied {denials} times, escalating",
capability=capability,
)
self.denial_history[agent_id] = denials + 1
return ResourceDecision(
decision=Decision.DENY,
reason=f"Balance {credit_balance:.2f} < threshold {adjusted_threshold:.2f}",
capability=capability,
)
def get_allowed_capabilities(self, agent_id: str, credit_balance: float) -> List[str]:
"""List all capabilities an agent can currently use."""
allowed = []
for cap in self.RESOURCE_RISK:
dec = self.request(cap, agent_id, credit_balance)
if dec.decision == Decision.ALLOW:
allowed.append(cap)
return allowed
def set_risk_threshold(self, risk_class: str, threshold: float) -> None:
self.thresholds[risk_class] = threshold