File size: 9,302 Bytes
325aa05
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
136ea72
325aa05
 
136ea72
325aa05
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
136ea72
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
from __future__ import annotations

# ---------------------------------------------------------------------------
# SENTINEL Scenario Dataset
#
# Each scenario is a list of 20 abstract subtask nodes.
# Subtask fields:
#   id          : unique within the scenario
#   description : what the orchestrator sees (abstract, domain-tagged)
#   domain      : ANALYZE | SYNTHESIZE | VERIFY | PLAN | EXECUTE
#   stakes      : 0.0-1.0  (>= 0.7 = adversarial specialist may trigger)
#   ground_truth: correct answer string (hidden from orchestrator)
#   depends_on  : list of subtask ids that must complete first (DAG edges)
#
# 3 scenario sets x 40 scenarios each = 120 total
#   task1 (easy)   : 40 scenarios β€” 10 subtasks, no adversary
#   task2 (medium) : 40 scenarios β€” 15 subtasks, adversary inactive
#   task3 (hard)   : 40 scenarios β€” 20 subtasks, adversary active
# ---------------------------------------------------------------------------

from typing import TypedDict


class SubTask(TypedDict):
    id: str
    description: str
    domain: str           # ANALYZE | SYNTHESIZE | VERIFY | PLAN | EXECUTE
    stakes: float
    ground_truth: str
    depends_on: list[str]


class Scenario(TypedDict):
    scenario_id: str
    task_type: str
    title: str
    description: str
    subtasks: list[SubTask]


# ---------------------------------------------------------------------------
# Subtask template builders
# ---------------------------------------------------------------------------

def _subtask(
    idx: int,
    domain: str,
    stakes: float,
    depends_on: list[str] | None = None,
) -> SubTask:
    """
    Build a generic abstract subtask.
    Description and ground_truth are domain-keyed but not domain-specific β€”
    the agent only sees the abstract framing, not the domain context.
    """
    sid = f"T{idx:02d}"

    templates = {
        "ANALYZE": {
            "description": f"Analyze the inputs from prior steps and identify the key pattern. "
                           f"(subtask {idx}, stakes={stakes:.1f})",
            "ground_truth": "PATTERN_IDENTIFIED",
        },
        "SYNTHESIZE": {
            "description": f"Synthesize outputs from completed subtasks into a unified result. "
                           f"(subtask {idx}, stakes={stakes:.1f})",
            "ground_truth": "SYNTHESIS_COMPLETE",
        },
        "VERIFY": {
            "description": f"Verify the correctness of the previous subtask result. "
                           f"(subtask {idx}, stakes={stakes:.1f})",
            "ground_truth": "VERIFIED_CORRECT",
        },
        "PLAN": {
            "description": f"Plan the next sequence of actions given current state. "
                           f"(subtask {idx}, stakes={stakes:.1f})",
            "ground_truth": "PLAN_VALID",
        },
        "EXECUTE": {
            "description": f"Execute the planned action and report the outcome. "
                           f"(subtask {idx}, stakes={stakes:.1f})",
            "ground_truth": "EXECUTION_SUCCESS",
        },
    }

    t = templates[domain]
    return SubTask(
        id=sid,
        description=t["description"],
        domain=domain,
        stakes=stakes,
        ground_truth=t["ground_truth"],
        depends_on=depends_on or [],
    )


# ---------------------------------------------------------------------------
# DAG layouts for each task type
# Each entry: (domain, stakes, depends_on_indices)
# depends_on_indices are 1-based subtask numbers
# ---------------------------------------------------------------------------

# Task 1 β€” 10 subtasks, linear chain, no high-stakes nodes
_TASK1_LAYOUT = [
    ("ANALYZE",    0.20, []),
    ("ANALYZE",    0.25, [1]),
    ("VERIFY",     0.20, [2]),
    ("SYNTHESIZE", 0.30, [3]),
    ("VERIFY",     0.25, [4]),
    ("PLAN",       0.30, [5]),
    ("ANALYZE",    0.35, [6]),
    ("SYNTHESIZE", 0.30, [7]),
    ("VERIFY",     0.35, [8]),
    ("EXECUTE",    0.40, [9]),
]

# Task 2 β€” 15 subtasks, branching DAG, medium stakes, no adversary trigger
_TASK2_LAYOUT = [
    ("ANALYZE",    0.20, []),
    ("ANALYZE",    0.25, [1]),
    ("VERIFY",     0.30, [2]),
    ("SYNTHESIZE", 0.35, [3]),
    ("PLAN",       0.40, [4]),
    ("ANALYZE",    0.30, [3]),       # branch from node 3
    ("VERIFY",     0.45, [5, 6]),    # joins branches
    ("EXECUTE",    0.50, [7]),
    ("VERIFY",     0.55, [8]),
    ("SYNTHESIZE", 0.45, [9]),
    ("PLAN",       0.50, [10]),
    ("ANALYZE",    0.55, [11]),
    ("VERIFY",     0.60, [12]),
    ("SYNTHESIZE", 0.60, [13]),
    ("EXECUTE",    0.65, [14]),
]

# Task 3 β€” 20 subtasks, full DAG with adversarial trigger zones (stakes >= 0.7)
_TASK3_LAYOUT = [
    ("ANALYZE",    0.20, []),
    ("ANALYZE",    0.25, [1]),
    ("VERIFY",     0.30, [2]),
    ("SYNTHESIZE", 0.35, [3]),
    ("PLAN",       0.40, [4]),
    ("ANALYZE",    0.30, [3]),       # branch A
    ("VERIFY",     0.45, [5, 6]),    # join A+B
    ("EXECUTE",    0.50, [7]),
    ("VERIFY",     0.55, [8]),
    ("SYNTHESIZE", 0.50, [9]),
    ("PLAN",       0.60, [10]),
    ("ANALYZE",    0.55, [11]),
    ("SYNTHESIZE", 0.65, [12]),
    ("VERIFY",     0.70, [13]),      # ← ADVERSARIAL ZONE START
    ("EXECUTE",    0.75, [14]),      # ← HIGH STAKES
    ("PLAN",       0.80, [15]),      # ← HIGH STAKES
    ("ANALYZE",    0.75, [16]),      # ← HIGH STAKES
    ("VERIFY",     0.85, [17]),      # ← PEAK STAKES
    ("SYNTHESIZE", 0.90, [18]),      # ← PEAK STAKES
    ("EXECUTE",    0.95, [19]),      # ← CRITICAL β€” terminal
]


def _build_scenario(
    scenario_id: str,
    task_type: str,
    layout: list[tuple],
    title_suffix: str,
) -> Scenario:
    subtasks = []
    for i, (domain, stakes, dep_indices) in enumerate(layout, start=1):
        depends_on = [f"T{d:02d}" for d in dep_indices]
        subtasks.append(_subtask(i, domain, stakes, depends_on))

    return Scenario(
        scenario_id=scenario_id,
        task_type=task_type,
        title=f"Multi-Agent Task Workflow {title_suffix}",
        description=(
            f"A {task_type} abstract multi-agent workflow where the orchestrator "
            f"must delegate {len(subtasks)} subtasks across 5 specialists with "
            f"hidden reliability profiles, building trust from behavioral evidence alone."
        ),
        subtasks=subtasks,
    )


# ---------------------------------------------------------------------------
# Generate 40 scenarios per task type
# Scenarios vary only by scenario_id and stakes jitter (+/- 0.05)
# so the trust-calibration challenge is consistent but not identical
# ---------------------------------------------------------------------------

import random as _random


def _jitter_stakes(layout: list[tuple], seed: int, max_jitter: float = 0.05) -> list[tuple]:
    """Apply small random stakes perturbation so each scenario is slightly different."""
    rng = _random.Random(seed)
    return [
        (domain, round(min(0.99, max(0.01, stakes + rng.uniform(-max_jitter, max_jitter))), 2), deps)
        for domain, stakes, deps in layout
    ]


def _generate_scenarios(
    task_type: str,
    layout: list[tuple],
    count: int = 40,
) -> list[Scenario]:
    stable_offsets = {"task1": 101, "task2": 202, "task3": 303}
    scenarios = []
    for i in range(count):
        jittered = _jitter_stakes(layout, seed=i * 100 + stable_offsets[task_type])
        sid = f"SCN-{task_type.upper()}-{i+1:03d}"
        scenarios.append(
            _build_scenario(sid, task_type, jittered, f"#{i+1:03d}")
        )
    return scenarios


# ---------------------------------------------------------------------------
# Public dataset
# ---------------------------------------------------------------------------

TASK1_SCENARIOS: list[Scenario] = _generate_scenarios("task1", _TASK1_LAYOUT,  count=40)
TASK2_SCENARIOS: list[Scenario] = _generate_scenarios("task2", _TASK2_LAYOUT,  count=40)
TASK3_SCENARIOS: list[Scenario] = _generate_scenarios("task3", _TASK3_LAYOUT,  count=40)

ALL_SCENARIOS: list[Scenario] = TASK1_SCENARIOS + TASK2_SCENARIOS + TASK3_SCENARIOS

SCENARIOS_BY_ID: dict[str, Scenario] = {s["scenario_id"]: s for s in ALL_SCENARIOS}

SCENARIOS_BY_TASK: dict[str, list[Scenario]] = {
    "task1": TASK1_SCENARIOS,
    "task2": TASK2_SCENARIOS,
    "task3": TASK3_SCENARIOS,
}


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

def get_scenario(scenario_id: str) -> Scenario:
    if scenario_id not in SCENARIOS_BY_ID:
        raise ValueError(f"Unknown scenario_id: {scenario_id}")
    return SCENARIOS_BY_ID[scenario_id]


def sample_scenario(task_type: str, seed: int | None = None) -> Scenario:
    pool = SCENARIOS_BY_TASK.get(task_type)
    if not pool:
        raise ValueError(f"Unknown task_type: {task_type}")
    rng = _random.Random(seed)
    return rng.choice(pool)


def scenario_summary() -> dict:
    return {
        "total": len(ALL_SCENARIOS),
        "task1": len(TASK1_SCENARIOS),
        "task2": len(TASK2_SCENARIOS),
        "task3": len(TASK3_SCENARIOS),
        "subtasks_per_task": {
            "task1": len(_TASK1_LAYOUT),
            "task2": len(_TASK2_LAYOUT),
            "task3": len(_TASK3_LAYOUT),
        },
    }