narcolepticchicken commited on
Commit
0da095b
·
verified ·
1 Parent(s): c5e904d

Upload broker/broker.py

Browse files
Files changed (1) hide show
  1. broker/broker.py +81 -135
broker/broker.py CHANGED
@@ -1,10 +1,9 @@
1
  """
2
- Resource Broker: capability-based rights based on credits, task state, and risk.
3
  """
4
-
5
  from dataclasses import dataclass
6
  from enum import Enum
7
- from typing import Any, Dict, Optional
8
 
9
 
10
  class Decision(Enum):
@@ -19,179 +18,126 @@ class Decision(Enum):
19
  @dataclass
20
  class ResourceDecision:
21
  decision: Decision
22
- resource: str
23
- agent_id: str
24
  reason: str
25
- allowed_quota: Optional[int] = None
26
  downgrade_to: Optional[str] = None
27
- requires_human: bool = False
28
 
29
 
30
  class ResourceBroker:
31
  """
32
- Grants capability-based rights based on:
33
- - credit balance (per capability_scope)
34
- - task state (urgency, progress)
35
- - risk profile (safety class of resource)
36
- - agent behavior history (gaming flags)
37
  """
38
 
39
- # Resource safety classes
40
- SAFETY_LOW = {"model_call_small", "retrieval_call", "debate_turn", "verifier_call"}
41
- SAFETY_MEDIUM = {"model_call_large", "memory_write", "shell_execute"}
42
- SAFETY_HIGH = {"file_write", "human_escalation"}
 
 
 
 
 
 
 
43
 
44
- # Default credit thresholds
45
  DEFAULT_THRESHOLDS = {
46
- "model_call_small": 1.0,
47
- "model_call_large": 10.0,
48
- "retrieval_call": 2.0,
49
- "verifier_call": 5.0,
50
- "debate_turn": 1.0,
51
- "file_write": 15.0,
52
- "shell_execute": 20.0,
53
- "memory_write": 8.0,
54
- "human_escalation": 50.0,
55
  }
56
 
57
  def __init__(
58
  self,
59
  thresholds: Optional[Dict[str, float]] = None,
60
- risk_multiplier: float = 1.5,
61
  ):
62
- self.thresholds = thresholds or dict(self.DEFAULT_THRESHOLDS)
63
- self.risk_multiplier = risk_multiplier
64
- self._agent_flags: Dict[str, Dict[str, Any]] = {}
 
65
 
66
  def request(
67
  self,
68
- resource: str,
69
  agent_id: str,
70
  credit_balance: float,
71
- task_state: Optional[Dict] = None,
72
- agent_flags: Optional[Dict] = None,
 
73
  ) -> ResourceDecision:
74
- """
75
- Decide whether to grant a resource request.
76
- """
77
  task_state = task_state or {}
78
- agent_flags = agent_flags or {}
79
-
80
- # Store flags for audit
81
- self._agent_flags.setdefault(agent_id, {}).update(agent_flags)
82
-
83
- # Determine safety class
84
- if resource in self.SAFETY_LOW:
85
- safety = "low"
86
- elif resource in self.SAFETY_MEDIUM:
87
- safety = "medium"
88
- else:
89
- safety = "high"
90
-
91
- # Adjust threshold based on safety and task urgency
92
- threshold = self.thresholds.get(resource, 10.0)
93
- urgency = task_state.get("urgency", 0.5)
94
- progress = task_state.get("progress", 0.0)
95
- gaming_score = agent_flags.get("gaming_score", 0.0)
96
-
97
- # Risk-adjusted threshold
98
- if safety == "medium":
99
- threshold *= self.risk_multiplier
100
- elif safety == "high":
101
- threshold *= self.risk_multiplier * 2.0
102
-
103
- # Urgency can lower threshold slightly
104
- threshold *= max(0.5, 1.0 - urgency * 0.3)
105
-
106
- # Progress bonus: as task progresses, lower threshold (momentum)
107
- threshold *= max(0.7, 1.0 - progress * 0.3)
108
-
109
- # Gaming flags override
110
- if gaming_score > 0.5:
111
  return ResourceDecision(
112
- decision=Decision.ESCALATE,
113
- resource=resource,
114
- agent_id=agent_id,
115
- reason=f"Gaming detected (score={gaming_score:.2f}). Escalating to human.",
116
- requires_human=True,
117
  )
118
 
119
- if gaming_score > 0.2:
 
120
  return ResourceDecision(
121
  decision=Decision.REQUIRE_APPROVAL,
122
- resource=resource,
123
- agent_id=agent_id,
124
- reason=f"Suspicious activity (score={gaming_score:.2f}). Approval required.",
125
- requires_human=True,
126
  )
127
 
128
- # Main decision logic
129
- if credit_balance >= threshold:
130
- if safety == "high":
131
- return ResourceDecision(
132
- decision=Decision.REQUIRE_APPROVAL,
133
- resource=resource,
134
- agent_id=agent_id,
135
- reason=f"High-safety resource. Credit sufficient ({credit_balance:.1f} >= {threshold:.1f}) but approval needed.",
136
- requires_human=True,
137
- )
138
  return ResourceDecision(
139
  decision=Decision.ALLOW,
140
- resource=resource,
141
- agent_id=agent_id,
142
- reason=f"Credit sufficient ({credit_balance:.1f} >= {threshold:.1f}). Safety={safety}.",
143
- allowed_quota=int(credit_balance / max(1.0, threshold)),
144
- )
145
-
146
- # Credit insufficient — consider downgrade for model calls
147
- if resource == "model_call_large" and credit_balance >= self.thresholds.get("model_call_small", 1.0):
148
- return ResourceDecision(
149
- decision=Decision.DOWNGRADE,
150
- resource=resource,
151
- agent_id=agent_id,
152
- reason=f"Insufficient credit for large model ({credit_balance:.1f} < {threshold:.1f}). Downgrading to small.",
153
- downgrade_to="model_call_small",
154
  )
155
 
156
- if resource == "shell_execute" and credit_balance >= self.thresholds.get("file_write", 15.0):
 
 
 
 
 
 
 
 
157
  return ResourceDecision(
158
- decision=Decision.DOWNGRADE,
159
- resource=resource,
160
- agent_id=agent_id,
161
- reason=f"Insufficient credit for shell. Downgrading to file_write.",
162
- downgrade_to="file_write",
163
  )
164
 
165
- # Low credit but high urgency -> ask for justification
166
- if urgency > 0.8:
 
167
  return ResourceDecision(
168
- decision=Decision.ASK_JUSTIFICATION,
169
- resource=resource,
170
- agent_id=agent_id,
171
- reason=f"Credit insufficient ({credit_balance:.1f} < {threshold:.1f}) but urgency high. Justification required.",
172
  )
173
 
 
174
  return ResourceDecision(
175
  decision=Decision.DENY,
176
- resource=resource,
177
- agent_id=agent_id,
178
- reason=f"Credit insufficient ({credit_balance:.1f} < {threshold:.1f}).",
179
  )
180
 
181
- def batch_request(
182
- self,
183
- resources: list,
184
- agent_id: str,
185
- credit_balance: float,
186
- task_state: Optional[Dict] = None,
187
- agent_flags: Optional[Dict] = None,
188
- ) -> list:
189
- """Evaluate multiple resource requests, returning decisions."""
190
- return [
191
- self.request(r, agent_id, credit_balance, task_state, agent_flags)
192
- for r in resources
193
- ]
194
-
195
- def audit_log(self, agent_id: str) -> Dict:
196
- """Return agent's flagged behavior history."""
197
- return self._agent_flags.get(agent_id, {})
 
1
  """
2
+ Resource Broker - grants capability-based rights based on credits, task state, and risk.
3
  """
 
4
  from dataclasses import dataclass
5
  from enum import Enum
6
+ from typing import Any, Dict, List, Optional
7
 
8
 
9
  class Decision(Enum):
 
18
  @dataclass
19
  class ResourceDecision:
20
  decision: Decision
 
 
21
  reason: str
22
+ capability: str
23
  downgrade_to: Optional[str] = None
 
24
 
25
 
26
  class ResourceBroker:
27
  """
28
+ Capability-based access control for agent resources.
29
+ Risk classes: low (retrieval), medium (model calls), high (file writes, shell).
 
 
 
30
  """
31
 
32
+ RESOURCE_RISK = {
33
+ "model_call": "medium",
34
+ "retrieval_call": "low",
35
+ "verifier_call": "medium",
36
+ "debate_turn": "low",
37
+ "file_write": "high",
38
+ "shell_execute": "high",
39
+ "memory_write": "medium",
40
+ "human_escalation": "high",
41
+ "larger_model": "medium",
42
+ }
43
 
 
44
  DEFAULT_THRESHOLDS = {
45
+ "low": 0.5,
46
+ "medium": 2.0,
47
+ "high": 5.0,
 
 
 
 
 
 
48
  }
49
 
50
  def __init__(
51
  self,
52
  thresholds: Optional[Dict[str, float]] = None,
53
+ urgency_boost: float = 0.5,
54
  ):
55
+ self.thresholds = thresholds or self.DEFAULT_THRESHOLDS.copy()
56
+ self.urgency_boost = urgency_boost
57
+ self.denial_history: Dict[str, int] = {}
58
+ self.approval_history: Dict[str, int] = {}
59
 
60
  def request(
61
  self,
62
+ capability: str,
63
  agent_id: str,
64
  credit_balance: float,
65
+ task_state: Optional[Dict[str, Any]] = None,
66
+ risk_score: float = 0.0,
67
+ gaming_flags: Optional[List[str]] = None,
68
  ) -> ResourceDecision:
 
 
 
69
  task_state = task_state or {}
70
+ gaming_flags = gaming_flags or []
71
+ risk_class = self.RESOURCE_RISK.get(capability, "medium")
72
+ threshold = self.thresholds.get(risk_class, 2.0)
73
+
74
+ # Adjust threshold based on urgency
75
+ urgency = task_state.get("urgency", 0.0)
76
+ adjusted_threshold = max(0.1, threshold - urgency * self.urgency_boost)
77
+
78
+ # Gaming detection overrides
79
+ if gaming_flags:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
80
  return ResourceDecision(
81
+ decision=Decision.DENY,
82
+ reason=f"Gaming detected: {gaming_flags}",
83
+ capability=capability,
 
 
84
  )
85
 
86
+ # High-risk resources with high risk score
87
+ if risk_class == "high" and risk_score > 0.7:
88
  return ResourceDecision(
89
  decision=Decision.REQUIRE_APPROVAL,
90
+ reason=f"High risk score {risk_score:.2f} for {capability}",
91
+ capability=capability,
 
 
92
  )
93
 
94
+ # Credit check
95
+ if credit_balance >= adjusted_threshold:
 
 
 
 
 
 
 
 
96
  return ResourceDecision(
97
  decision=Decision.ALLOW,
98
+ reason=f"Balance {credit_balance:.2f} >= threshold {adjusted_threshold:.2f}",
99
+ capability=capability,
 
 
 
 
 
 
 
 
 
 
 
 
100
  )
101
 
102
+ # Near-threshold: downgrade or ask for justification
103
+ if credit_balance >= adjusted_threshold * 0.5:
104
+ if risk_class == "medium":
105
+ return ResourceDecision(
106
+ decision=Decision.DOWNGRADE,
107
+ reason=f"Balance {credit_balance:.2f} below threshold, downgrading",
108
+ capability=capability,
109
+ downgrade_to="retrieval_call" if capability != "retrieval_call" else None,
110
+ )
111
  return ResourceDecision(
112
+ decision=Decision.ASK_JUSTIFICATION,
113
+ reason=f"Balance {credit_balance:.2f} insufficient, justification required",
114
+ capability=capability,
 
 
115
  )
116
 
117
+ # Escalation if repeated denials
118
+ denials = self.denial_history.get(agent_id, 0)
119
+ if denials > 3:
120
  return ResourceDecision(
121
+ decision=Decision.ESCALATE,
122
+ reason=f"Agent {agent_id} denied {denials} times, escalating",
123
+ capability=capability,
 
124
  )
125
 
126
+ self.denial_history[agent_id] = denials + 1
127
  return ResourceDecision(
128
  decision=Decision.DENY,
129
+ reason=f"Balance {credit_balance:.2f} < threshold {adjusted_threshold:.2f}",
130
+ capability=capability,
 
131
  )
132
 
133
+ def get_allowed_capabilities(self, agent_id: str, credit_balance: float) -> List[str]:
134
+ """List all capabilities an agent can currently use."""
135
+ allowed = []
136
+ for cap in self.RESOURCE_RISK:
137
+ dec = self.request(cap, agent_id, credit_balance)
138
+ if dec.decision == Decision.ALLOW:
139
+ allowed.append(cap)
140
+ return allowed
141
+
142
+ def set_risk_threshold(self, risk_class: str, threshold: float) -> None:
143
+ self.thresholds[risk_class] = threshold