| """ |
| Resource Broker - grants capability-based rights based on credits, task state, and risk. |
| """ |
| from dataclasses import dataclass |
| from enum import Enum |
| from typing import Any, Dict, List, Optional |
|
|
|
|
| class Decision(Enum): |
| ALLOW = "allow" |
| DENY = "deny" |
| REQUIRE_APPROVAL = "require_approval" |
| DOWNGRADE = "downgrade" |
| ESCALATE = "escalate" |
| ASK_JUSTIFICATION = "ask_justification" |
|
|
|
|
| @dataclass |
| class ResourceDecision: |
| decision: Decision |
| reason: str |
| capability: str |
| downgrade_to: Optional[str] = None |
|
|
|
|
| class ResourceBroker: |
| """ |
| Capability-based access control for agent resources. |
| Risk classes: low (retrieval), medium (model calls), high (file writes, shell). |
| """ |
|
|
| RESOURCE_RISK = { |
| "model_call": "medium", |
| "retrieval_call": "low", |
| "verifier_call": "medium", |
| "debate_turn": "low", |
| "file_write": "high", |
| "shell_execute": "high", |
| "memory_write": "medium", |
| "human_escalation": "high", |
| "larger_model": "medium", |
| } |
|
|
| DEFAULT_THRESHOLDS = { |
| "low": 0.5, |
| "medium": 2.0, |
| "high": 5.0, |
| } |
|
|
| def __init__( |
| self, |
| thresholds: Optional[Dict[str, float]] = None, |
| urgency_boost: float = 0.5, |
| ): |
| self.thresholds = thresholds or self.DEFAULT_THRESHOLDS.copy() |
| self.urgency_boost = urgency_boost |
| self.denial_history: Dict[str, int] = {} |
| self.approval_history: Dict[str, int] = {} |
|
|
| def request( |
| self, |
| capability: str, |
| agent_id: str, |
| credit_balance: float, |
| task_state: Optional[Dict[str, Any]] = None, |
| risk_score: float = 0.0, |
| gaming_flags: Optional[List[str]] = None, |
| ) -> ResourceDecision: |
| task_state = task_state or {} |
| gaming_flags = gaming_flags or [] |
| risk_class = self.RESOURCE_RISK.get(capability, "medium") |
| threshold = self.thresholds.get(risk_class, 2.0) |
|
|
| |
| urgency = task_state.get("urgency", 0.0) |
| adjusted_threshold = max(0.1, threshold - urgency * self.urgency_boost) |
|
|
| |
| if gaming_flags: |
| return ResourceDecision( |
| decision=Decision.DENY, |
| reason=f"Gaming detected: {gaming_flags}", |
| capability=capability, |
| ) |
|
|
| |
| if risk_class == "high" and risk_score > 0.7: |
| return ResourceDecision( |
| decision=Decision.REQUIRE_APPROVAL, |
| reason=f"High risk score {risk_score:.2f} for {capability}", |
| capability=capability, |
| ) |
|
|
| |
| if credit_balance >= adjusted_threshold: |
| return ResourceDecision( |
| decision=Decision.ALLOW, |
| reason=f"Balance {credit_balance:.2f} >= threshold {adjusted_threshold:.2f}", |
| capability=capability, |
| ) |
|
|
| |
| if credit_balance >= adjusted_threshold * 0.5: |
| if risk_class == "medium": |
| return ResourceDecision( |
| decision=Decision.DOWNGRADE, |
| reason=f"Balance {credit_balance:.2f} below threshold, downgrading", |
| capability=capability, |
| downgrade_to="retrieval_call" if capability != "retrieval_call" else None, |
| ) |
| return ResourceDecision( |
| decision=Decision.ASK_JUSTIFICATION, |
| reason=f"Balance {credit_balance:.2f} insufficient, justification required", |
| capability=capability, |
| ) |
|
|
| |
| denials = self.denial_history.get(agent_id, 0) |
| if denials > 3: |
| return ResourceDecision( |
| decision=Decision.ESCALATE, |
| reason=f"Agent {agent_id} denied {denials} times, escalating", |
| capability=capability, |
| ) |
|
|
| self.denial_history[agent_id] = denials + 1 |
| return ResourceDecision( |
| decision=Decision.DENY, |
| reason=f"Balance {credit_balance:.2f} < threshold {adjusted_threshold:.2f}", |
| capability=capability, |
| ) |
|
|
| def get_allowed_capabilities(self, agent_id: str, credit_balance: float) -> List[str]: |
| """List all capabilities an agent can currently use.""" |
| allowed = [] |
| for cap in self.RESOURCE_RISK: |
| dec = self.request(cap, agent_id, credit_balance) |
| if dec.decision == Decision.ALLOW: |
| allowed.append(cap) |
| return allowed |
|
|
| def set_risk_threshold(self, risk_class: str, threshold: float) -> None: |
| self.thresholds[risk_class] = threshold |
|
|