File size: 10,625 Bytes
97f478c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5f19e92
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97f478c
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
"""Simulated agent state machines with tick-based countdown and seeded failures.

Agent state machine:
  idle β†’ (assign) β†’ working(steps_remaining) β†’ tick countdown β†’
    when steps_remaining==0: seeded_random < reliability ? completed : failed
"""

import hashlib
import random
from dataclasses import dataclass, field
from typing import Any, Union

from models import AgentInfo


def _deterministic_hash(s: str) -> int:
    """Process-independent hash for seeded RNG determinism.

    Python's built-in hash() is randomized per process since 3.3.
    Using SHA-256 ensures reproducibility across restarts and machines.
    """
    return int(hashlib.sha256(s.encode()).hexdigest()[:16], 16)


def _get_effective_reliability(
    overrides: dict[tuple[str, str], Union[float, list[float]]],
    agent_name: str,
    subtask_type: str,
    attempt_count: int,
    default_reliability: float,
) -> float:
    """Get the reliability for an (agent, subtask_type, attempt) triple.

    If overrides has a list, index by attempt_count (clamped to last element).
    If overrides has a float, use it for all attempts.
    If no override, use the agent's default reliability.
    """
    key = (agent_name, subtask_type)
    override = overrides.get(key)
    if override is None:
        return default_reliability
    if isinstance(override, list):
        idx = min(attempt_count, len(override) - 1)
        return override[idx]
    return override


@dataclass
class _AgentState:
    """Internal mutable state for an agent."""

    name: str
    capabilities: list[str] = field(default_factory=list)
    status: str = "idle"
    current_task: str | None = None
    current_task_type: str | None = None
    reliability: float = 1.0
    speed: int = 1
    cost_per_step: float = 1.0
    steps_remaining: int = 0
    attempt_count: int = 0


@dataclass
class TickResult:
    """Result from a single agent completing or failing a task during tick."""

    agent_name: str
    subtask_id: str
    succeeded: bool
    output_or_error: str
    is_permanent_failure: bool = False


class AgentPool:
    """Manages simulated specialist agents with tick-based work simulation."""

    def __init__(
        self,
        agent_definitions: list[dict[str, Any]],
        reliability_overrides: dict[tuple[str, str], Union[float, list[float]]],
        seed: int,
    ) -> None:
        self._agents: dict[str, _AgentState] = {}
        for defn in agent_definitions:
            name = defn["name"]
            self._agents[name] = _AgentState(
                name=name,
                capabilities=list(defn["capabilities"]),
                reliability=defn["reliability"],
                speed=defn["speed"],
                cost_per_step=defn["cost_per_step"],
            )
        self._reliability_overrides = reliability_overrides
        self._seed = seed
        self._budget_used: float = 0.0

    # ── State mutations ──

    def assign(
        self,
        agent_name: str,
        subtask_id: str,
        subtask_type: str,
        attempt_count: int,
    ) -> None:
        """Assign a subtask to an idle, capable agent."""
        agent = self._get(agent_name)
        if agent.status != "idle":
            raise ValueError(f"Agent '{agent_name}' is not idle (status: {agent.status})")
        if subtask_type not in agent.capabilities:
            raise ValueError(
                f"Agent '{agent_name}' lacks capability '{subtask_type}'"
            )
        agent.status = "working"
        agent.current_task = subtask_id
        agent.current_task_type = subtask_type
        agent.steps_remaining = agent.speed
        agent.attempt_count = attempt_count

    def tick(self, step_number: int) -> list[TickResult]:
        """Advance all working agents by one step.

        For each working agent:
          - Accrue cost (budget_used += cost_per_step)
          - Decrement steps_remaining
          - If steps_remaining reaches 0: determine success via seeded RNG

        Returns list of TickResults for agents that finished this tick.
        """
        results: list[TickResult] = []

        for agent in self._agents.values():
            if agent.status != "working":
                continue

            self._budget_used += agent.cost_per_step
            agent.steps_remaining -= 1

            if agent.steps_remaining <= 0:
                reliability = _get_effective_reliability(
                    self._reliability_overrides,
                    agent.name,
                    agent.current_task_type,
                    agent.attempt_count,
                    agent.reliability,
                )

                h = _deterministic_hash(
                    f"{agent.name}:{agent.current_task}:{agent.attempt_count}"
                )
                rng = random.Random(self._seed + h)
                roll = rng.random()
                succeeded = roll < reliability

                is_permanent = reliability == 0.0

                if succeeded:
                    output_or_error = ""  # environment fills from output_template
                else:
                    if is_permanent:
                        output_or_error = (
                            f"Agent '{agent.name}' lacks required tooling for "
                            f"'{agent.current_task_type}' β€” permanent failure"
                        )
                    else:
                        output_or_error = (
                            f"Execution failed on '{agent.current_task}', may succeed on retry"
                        )

                results.append(TickResult(
                    agent_name=agent.name,
                    subtask_id=agent.current_task,
                    succeeded=succeeded,
                    output_or_error=output_or_error,
                    is_permanent_failure=is_permanent and not succeeded,
                ))

        return results

    def release_agent(self, agent_name: str) -> None:
        """Reset an agent to idle after task completion or failure."""
        agent = self._get(agent_name)
        agent.status = "idle"
        agent.current_task = None
        agent.current_task_type = None
        agent.steps_remaining = 0

    def apply_scheduled_events(
        self, step: int, scheduled_events: list[dict[str, Any]]
    ) -> list[dict[str, Any]]:
        """Apply any scheduled events for the current step.

        Returns list of events that fired (for episode log).
        """
        fired: list[dict[str, Any]] = []
        for event in scheduled_events:
            if event["step"] != step:
                continue

            target = event["target"]
            event_type = event["event_type"]

            if event_type == "dropout":
                agent = self._agents.get(target)
                if agent and agent.status != "offline":
                    was_working = agent.status == "working"
                    released_task = agent.current_task
                    agent.status = "offline"
                    agent.current_task = None
                    agent.current_task_type = None
                    agent.steps_remaining = 0
                    fired.append({
                        "event_type": "agent_dropout",
                        "agent_name": target,
                        "was_working": was_working,
                        "released_task": released_task,
                    })

            elif event_type == "degradation":
                agent = self._agents.get(target)
                if agent and agent.status != "offline":
                    new_speed = event.get("params", {}).get("new_speed", agent.speed * 2)
                    agent.speed = new_speed
                    fired.append({
                        "event_type": "agent_degraded",
                        "agent_name": target,
                        "new_speed": new_speed,
                    })

        return fired

    # ── Queries ──

    def get_idle_agents(self) -> list[str]:
        """Return names of agents with status 'idle'."""
        return [a.name for a in self._agents.values() if a.status == "idle"]

    def get_capable_agents(self, subtask_type: str) -> list[str]:
        """Return names of idle agents that have the capability for subtask_type."""
        return [
            a.name for a in self._agents.values()
            if a.status == "idle" and subtask_type in a.capabilities
        ]

    def get_active_count(self) -> int:
        """Return count of agents currently working."""
        return sum(1 for a in self._agents.values() if a.status == "working")

    def get_budget_used(self) -> float:
        return self._budget_used

    def has_capability(self, agent_name: str, subtask_type: str) -> bool:
        agent = self._agents.get(agent_name)
        return agent is not None and subtask_type in agent.capabilities

    def is_idle(self, agent_name: str) -> bool:
        agent = self._agents.get(agent_name)
        return agent is not None and agent.status == "idle"

    def is_online(self, agent_name: str) -> bool:
        agent = self._agents.get(agent_name)
        return agent is not None and agent.status != "offline"

    def get_agent_cost(self, agent_name: str) -> float:
        return self._get(agent_name).cost_per_step

    def get_agent_infos(self) -> list[AgentInfo]:
        """Export current state as list of AgentInfo Pydantic models (for observations)."""
        return [
            AgentInfo(
                name=a.name,
                capabilities=a.capabilities,
                status=a.status,
                current_task=a.current_task,
                reliability=a.reliability,
                speed=a.speed,
                cost_per_step=a.cost_per_step,
            )
            for a in self._agents.values()
        ]

    def get_effective_reliability(
        self, agent_name: str, subtask_type: str, attempt_count: int
    ) -> float:
        """Get the effective reliability for an (agent, subtask_type, attempt) triple.

        Public API for use by the environment to check for permanent failures
        without duplicating reliability override logic.
        """
        agent = self._get(agent_name)
        return _get_effective_reliability(
            self._reliability_overrides,
            agent_name,
            subtask_type,
            attempt_count,
            agent.reliability,
        )

    def _get(self, agent_name: str) -> _AgentState:
        if agent_name not in self._agents:
            raise KeyError(f"Unknown agent: '{agent_name}'")
        return self._agents[agent_name]