File size: 63,843 Bytes
02ff91f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c699da7
 
 
02ff91f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a5c7dd0
 
 
 
 
 
 
 
 
 
02ff91f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
"""
SpindleFlowEnv β€” Main RL environment.
Gymnasium-compatible. Wraps SpindleFlow as the execution backend.
LSTM-policy-safe: state representation is complete per-step (no hidden history).

The environment does NOT call SpindleFlow for every episode during training β€”
that would be too slow and expensive. Instead, for Phase 1/2 training it uses
a simulated specialist execution (fast, free). For evaluation and demo, it
calls real SpindleFlow.
"""

from __future__ import annotations
import time
import numpy as np
import gymnasium as gym
from gymnasium import spaces
from pathlib import Path
from typing import Optional, Any
import yaml

from env.specialist_registry import SpecialistRegistry
from env.delegation_graph import DelegationGraph
from env.scratchpad import SharedScratchpad
from env.state import build_state, EpisodeState
from env.action_space import ActionDecoder, MetaAction, FactoredAction, DelegationMode
from reward.tier_lock import EpisodeTierLock
from reward.tiered_reward import TieredRewardScorer
from reward.latency_reward import LatencySLAConfig, compute_latency_penalty
from reward.failure_reward import (
    SpecialistResult, SpecialistStatus,
    compute_failure_penalty, compute_recovery_bonus,
)
from reward.conflict_reward import detect_conflicts
from reward.consistency_tracker import PathConsistencyTracker
from agents.task_decomposer import TaskDecomposer, EnrichedTask
from agents.conflict_resolver import ConflictResolver
from agents.fallback_chain import FallbackChainResolver
from agents.specialist_memory import SpecialistMemory
from training.spawn_memory import SpawnMemory, SpawnRecord
from training.task_bank import TaskBank


class SpindleFlowEnv(gym.Env):
    """
    RL Environment for SpindleFlow delegation policy training.

    Episode structure:
      1. Reset: Draw task from task bank, embed it, lock tier, set up components
      2. Step loop: Policy chooses action β†’ environment executes β†’ compute reward
      3. Termination: STOP action, max_steps reached, or episode error

    Observation space: Flat vector (see EpisodeState.observation_dim())
    Action space: Box (continuous β€” decoded by ActionDecoder)
    """

    metadata = {"render_modes": ["human"]}

    def __init__(
        self,
        config_path: str = "configs/training_config.yaml",
        catalog_path: str = "configs/specialist_catalog.yaml",
        use_real_spindleflow: bool = False,
        phase: int = 1,
        render_mode: Optional[str] = None,
        simulate_specialists: bool = False,
    ):
        super().__init__()

        with open(config_path) as f:
            self.config = yaml.safe_load(f)

        env_cfg = self.config["environment"]
        self.max_steps = env_cfg["max_steps_per_episode"]
        self.max_depth = env_cfg["max_delegation_depth"]
        self.max_specialists = env_cfg.get("max_specialists_per_episode", 6)
        self.specialist_timeout_ms = env_cfg["specialist_timeout_ms"]
        self.phase = phase
        self.use_real_spindleflow = use_real_spindleflow
        self.render_mode = render_mode
        # When True: per-step specialist calls use simulation even if OPENAI_API_KEY
        # is set. Episode-level self-learning (finetuner, spawn) still use the key.
        self.simulate_specialists = simulate_specialists

        reward_cfg = self.config["reward"]
        self.latency_sla = LatencySLAConfig(
            budget_ms=10000.0,
            weight=reward_cfg["latency_weight"],
        )

        # Initialize components
        self.registry = SpecialistRegistry(catalog_path)
        self.task_bank = TaskBank(
            phase=phase,
            config_path=config_path,
            catalog_path=catalog_path,
        )
        # Load sector contradiction pairs from catalog (for conflict detection)
        with open(catalog_path) as _f:
            _catalog_meta = yaml.safe_load(_f).get("metadata", {})
        self._contradiction_pairs = [
            tuple(pair) for pair in _catalog_meta.get("contradiction_pairs", [])
        ]

        self.task_decomposer = TaskDecomposer(sector_cfg=self.config.get("sector", {}))
        _resolution_mem_path = self.config.get("agents", {}).get(
            "resolution_memory_path", "data/resolution_memory.jsonl"
        )
        self.conflict_resolver = ConflictResolver(
            config=self.config,
            memory_path=_resolution_mem_path,
        )
        self.fallback_resolver = FallbackChainResolver()
        self.reward_scorer = TieredRewardScorer(registry=self.registry)
        self.consistency_tracker = PathConsistencyTracker(
            specialist_ids=self.registry.list_ids()
        )
        si_cfg = self.config.get("specialist_improvement", {})
        memory_path = si_cfg.get("memory_path", "data/specialist_memory.json")
        self.specialist_memory = SpecialistMemory(path=memory_path)

        spawn_mem_path = env_cfg.get("spawn_memory_path", "data/spawn_memory.jsonl")
        self._spawn_memory = SpawnMemory(
            path=spawn_mem_path,
            max_entries=env_cfg.get("spawn_memory_max_entries", 500),
        )
        self._pending_spawn_records: list[SpawnRecord] = []
        self.action_decoder = ActionDecoder(
            specialist_ids=self.registry.list_ids(),
            max_specialists=self.max_specialists,
        )

        # Spawn config
        self.spawn_threshold: float = env_cfg.get("spawn_threshold", 0.50)
        self.auto_spawn: bool = env_cfg.get("auto_spawn_specialists", True)
        # Max total spawned specialists across the lifetime of this env instance.
        # Caps registry growth so the observation space stays stable during long runs.
        self._spawn_max_total: int = env_cfg.get("spawn_max_total", 8)
        # Minimum episodes between consecutive spawns β€” prevents burst-spawning on
        # a streak of low-similarity tasks and keeps the action decoder stable.
        self._spawn_cooldown_episodes: int = env_cfg.get("spawn_cooldown_episodes", 20)
        # Lifetime counters (survive across resets)
        self._spawn_total_count: int = 0
        self._last_spawn_episode: int = -999  # episode index of last spawn
        self._episode_index: int = 0

        # Per-episode state
        self.delegation_graph = DelegationGraph(max_depth=self.max_depth)
        self.scratchpad = SharedScratchpad()
        self.current_task: Optional[EnrichedTask] = None
        self.tier_lock: Optional[EpisodeTierLock] = None
        self.specialist_results: list[SpecialistResult] = []
        self.called_ids: list[str] = []
        self.step_count: int = 0
        self.episode_start_ms: float = 0.0
        self.generalist_baseline: str = ""
        self.config_reward = reward_cfg
        self._last_reward_components: dict = {}
        self._last_factored_action: Optional[Any] = None
        # Active roster for this episode (top-K by task similarity, including spawned)
        self.active_specialist_ids: list[str] = self.registry.list_ids()[:self.max_specialists]
        self.spawned_this_episode: list[str] = []
        # Task embedding cached at reset() β€” constant within an episode, no need to re-embed each step
        self._task_emb: np.ndarray | None = None

        # Spaces
        obs_dim = EpisodeState.observation_dim(self.max_specialists)
        self.observation_space = spaces.Box(
            low=-10.0, high=10.0, shape=(obs_dim,), dtype=np.float32
        )
        self.action_space = spaces.Box(
            low=-1.0, high=1.0,
            shape=(self.action_decoder.get_action_dim(),),
            dtype=np.float32,
        )

    def reset(
        self,
        seed: Optional[int] = None,
        options: Optional[dict] = None,
    ) -> tuple[np.ndarray, dict]:
        super().reset(seed=seed)

        self.delegation_graph.reset()
        self.scratchpad.reset(episode_id=str(time.time()))
        self.specialist_results = []
        self.called_ids = []
        self.step_count = 0
        self.episode_start_ms = time.time() * 1000

        task_desc = self.task_bank.sample()
        self.current_task = self.task_decomposer.decompose(task_desc)

        self.tier_lock = EpisodeTierLock.for_task(
            self.current_task.complexity_class
        )

        self.generalist_baseline = self._generate_generalist_baseline(
            self.current_task.enriched_description
        )

        self.delegation_graph.add_root("orchestrator")
        self._episode_index += 1

        task_desc = self.current_task.enriched_description
        task_emb  = self.registry.embed_query(task_desc)
        assert task_emb is not None and task_emb.shape == (384,), (
            f"Task embedding failed: got shape {getattr(task_emb, 'shape', None)}"
        )
        self._task_emb = task_emb   # cached for entire episode β€” task doesn't change

        self.spawned_this_episode = []
        self._pending_spawn_records = []
        # Auto-spawn: if no existing specialist covers this task well, create one via LLM.
        if self.auto_spawn:
            self._maybe_spawn_specialist(task_emb, task_desc)

        # ── Build per-episode active roster (top-K by task similarity) ──
        self.active_specialist_ids = self._select_active_specialists(task_emb)

        # ── Rebuild action decoder to reflect the updated roster ──
        self.action_decoder = ActionDecoder(
            specialist_ids=self.active_specialist_ids,
            max_specialists=self.max_specialists,
        )

        state = build_state(
            task_embedding=task_emb,
            registry=self.registry,
            called_ids=[],
            delegation_graph=self.delegation_graph,
            scratchpad=self.scratchpad,
            step_count=0,
            elapsed_ms=0.0,
            sla_budget_ms=self.latency_sla.budget_ms,
            max_specialists=self.max_specialists,
            max_depth=self.max_depth,
            phase=self.phase,
            active_ids=self.active_specialist_ids,
        )

        info = {
            "task":               task_desc,
            "complexity":         self.current_task.complexity_class,
            "tier":               self.tier_lock.locked_tier.name,
            "active_specialists": list(self.active_specialist_ids),
            "spawned_specialists": list(self.spawned_this_episode),
        }

        return state.to_flat_vector(), info

    def step(
        self, action: np.ndarray
    ) -> tuple[np.ndarray, float, bool, bool, dict]:
        """
        Execute one step in the environment.
        Returns: (observation, reward, terminated, truncated, info)
        """
        self.step_count += 1
        elapsed_ms = time.time() * 1000 - self.episode_start_ms

        # Build specialist mask (enforce DAG constraints)
        valid_ids = self.delegation_graph.get_valid_callees(
            "orchestrator", self.active_specialist_ids
        )
        valid_ids = [sid for sid in valid_ids if sid not in self.called_ids]
        mask = self.action_decoder.build_specialist_mask(valid_ids)

        factored: FactoredAction = self.action_decoder.decode(action, mask)

        assert self._task_emb is not None, (
            "step() called before reset() or task embedding failed in reset()"
        )
        task_emb = self._task_emb

        terminated = False
        truncated = False
        step_results = []

        if factored.meta_action == MetaAction.STOP or self.step_count >= self.max_steps:
            terminated = True
        else:
            step_results = self._dispatch_meta_action(factored, elapsed_ms)
            self.specialist_results.extend(step_results)
            _reg = set(self.registry.list_ids())
            self.called_ids.extend(
                r.specialist_id for r in step_results
                if r.specialist_id in _reg
            )

        if self.step_count >= self.max_steps and not terminated:
            truncated = True
        state = build_state(
            task_embedding=task_emb,
            registry=self.registry,
            called_ids=self.called_ids,
            delegation_graph=self.delegation_graph,
            scratchpad=self.scratchpad,
            step_count=self.step_count,
            elapsed_ms=elapsed_ms,
            sla_budget_ms=self.latency_sla.budget_ms,
            max_specialists=self.max_specialists,
            max_depth=self.max_depth,
            phase=self.phase,
            active_ids=self.active_specialist_ids,
        )

        if terminated or truncated:
            reward = self._compute_final_reward(elapsed_ms)
            self._record_episode_to_memory(reward)
        else:
            reward = self._compute_step_reward(
                step_results, task_emb,
                delegation_mode=factored.delegation_mode,
                meta_action=factored.meta_action,
            )

        step_latencies = {r.specialist_id: r.latency_ms for r in step_results}
        info = {
            # Keys expected by the UI / Streamlit dashboard
            "action_name":           factored.meta_action.name,
            "called_specialists":    list(factored.specialist_ids),
            "delegation_mode":       factored.delegation_mode.name,
            "reward_components":     dict(self._last_reward_components),
            "specialist_latencies":  step_latencies,
            "active_specialists":    list(self.active_specialist_ids),
            "spawned_specialists":   list(self.spawned_this_episode),
            # Raw data for debugging / training callbacks
            "action":                factored.to_log_dict(),
            "called_ids":            list(self.called_ids),
            "step_count":            self.step_count,
            "elapsed_ms":            elapsed_ms,
        }

        return state.to_flat_vector(), reward, terminated, truncated, info

    # ── MetaAction dispatch ───────────────────────────────────────────

    def _dispatch_meta_action(
        self, action: FactoredAction, elapsed_ms: float
    ) -> list[SpecialistResult]:
        """Route to the correct handler based on MetaAction."""
        if action.meta_action == MetaAction.CALL_MEDIATOR:
            return self._exec_meta_mediator(action, elapsed_ms)
        if action.meta_action == MetaAction.CLARIFY_TASK:
            return self._exec_meta_clarify(action, elapsed_ms)
        if action.meta_action == MetaAction.DELEGATE_SUBTASK:
            return self._exec_meta_delegate_subtask(action, elapsed_ms)
        if action.meta_action == MetaAction.RETRY_FAILED:
            return self._exec_meta_retry(action, elapsed_ms)
        if action.meta_action == MetaAction.PARALLEL_SPAWN:
            return self._exec_meta_parallel_spawn(action, elapsed_ms)
        if action.meta_action == MetaAction.SPAWN_SPECIALIST:
            return self._exec_meta_spawn_specialist(action, elapsed_ms)
        return self._execute_action(action, elapsed_ms)   # CALL_SPECIALIST default

    # ── DelegationMode dispatch ───────────────────────────────────────

    def _execute_action(
        self, action: FactoredAction, elapsed_ms: float
    ) -> list[SpecialistResult]:
        """Dispatch to the correct DelegationMode handler."""
        handlers = {
            DelegationMode.SEQUENTIAL:     self._exec_sequential,
            DelegationMode.PARALLEL:       self._exec_parallel,
            DelegationMode.FAN_OUT_REDUCE: self._exec_fan_out_reduce,
            DelegationMode.ITERATIVE:      self._exec_iterative,
            DelegationMode.CONDITIONAL:    self._exec_conditional,
            DelegationMode.PRIORITY_QUEUE: self._exec_priority_queue,
            DelegationMode.BROADCAST:      self._exec_broadcast,
        }
        return handlers.get(action.delegation_mode, self._exec_sequential)(action, elapsed_ms)

    # ── Shared helpers ────────────────────────────────────────────────

    def _can_call(self, sid: str, caller_id: str = "orchestrator") -> bool:
        """True when a specialist is registered, not yet called, and DAG-valid."""
        return (
            sid in self.registry.list_ids()
            and sid not in self.called_ids
            and self.delegation_graph.can_delegate(caller_id, sid)
        )

    def _do_call(
        self,
        sid: str,
        task: str,
        elapsed_ms: float,
        mode: str = "SEQUENTIAL",
        context: str | None = None,
        caller_id: str = "orchestrator",
    ) -> list[SpecialistResult]:
        """
        Validate β†’ record in DAG β†’ call specialist β†’ handle fallback β†’ write scratchpad.

        caller_id controls which node in the delegation graph is the caller.
        Defaults to "orchestrator" for top-level calls. Pass a specialist ID
        to record depth-2 delegations (specialist β†’ sub-specialist).
        Returns a list because a fallback may contribute a second result.
        """
        if not self._can_call(sid, caller_id=caller_id):
            return []
        self.delegation_graph.record_delegation(caller_id, sid, mode)
        result = self._call_specialist(sid, task, elapsed_ms, context=context)
        if result.output:
            self.scratchpad.write(
                author_id=sid,
                author_role=self.registry.get(sid).role,
                content=result.output,
            )
        results = [result]
        if self.fallback_resolver.needs_fallback(result):
            fb_id = self.fallback_resolver.get_fallback(sid, self.called_ids)
            if fb_id and self._can_call(fb_id):
                self.delegation_graph.record_delegation("orchestrator", fb_id, mode)
                fb = self._call_specialist(
                    fb_id, self.current_task.enriched_description, elapsed_ms
                )
                fb.fallback_used = True
                if fb.output:
                    self.scratchpad.write(
                        author_id=fb_id,
                        author_role=self.registry.get(fb_id).role,
                        content=fb.output,
                    )
                results.append(fb)
                # Do NOT append fb_id here β€” step() uniformly extends called_ids
                # from all step_results after _do_call returns, so appending here
                # would cause a double-count (efficiency penalty and DAG mask both
                # use called_ids, making the fallback specialist appear called twice).
        return results

    def _quick_quality_score(self, output: str, task: str) -> float:
        """Fast T1 cosine similarity β€” used for within-step stopping conditions."""
        try:
            t = self.registry.embed_query(task)
            o = self.registry.embed_query(output[:800])
            return float((self.registry.cosine_similarity(t, o) + 1.0) / 2.0)
        except Exception:
            return 0.5

    def _synthesize_outputs(self, outputs: list[str]) -> str:
        """Merge multiple specialist outputs into one coherent synthesis."""
        import os
        if os.getenv("OPENAI_API_KEY") and len(outputs) >= 2:
            try:
                from openai import OpenAI
                combined = "\n\n---\n\n".join(
                    f"Specialist {i+1}:\n{o[:500]}" for i, o in enumerate(outputs)
                )
                client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
                resp = client.chat.completions.create(
                    model="gpt-4o-mini", max_tokens=600,
                    messages=[
                        {"role": "system", "content":
                            "Synthesize these specialist analyses into one coherent "
                            "recommendation. Resolve contradictions, highlight consensus."},
                        {"role": "user", "content": combined[:2000]},
                    ],
                )
                return resp.choices[0].message.content
            except Exception as exc:
                print(f"[Synthesize] {exc}")
        joined = "\n\n".join(f"[{i+1}] {o[:200]}" for i, o in enumerate(outputs))
        return (
            f"Synthesis of {len(outputs)} specialist outputs:\n{joined}\n"
            "Consensus: structured design, domain best practices, iterative validation."
        )

    # ── DelegationMode handlers ───────────────────────────────────────

    def _exec_sequential(
        self, action: FactoredAction, elapsed_ms: float
    ) -> list[SpecialistResult]:
        """A→B→C: each specialist receives accumulated context from prior outputs.
        Highest quality for dependent sub-problems."""
        results: list[SpecialistResult] = []
        context = ""
        for sid in action.specialist_ids:
            batch = self._do_call(
                sid, self.current_task.enriched_description,
                elapsed_ms, mode="SEQUENTIAL",
                context=context or None,
            )
            results.extend(batch)
            for r in batch:
                if r.output:
                    context += f"\n{r.output[:400]}"
        return results

    def _exec_parallel(
        self, action: FactoredAction, elapsed_ms: float
    ) -> list[SpecialistResult]:
        """All specialists see the same task independently β€” no context sharing.
        Lower quality than SEQUENTIAL, lower effective latency for independent work."""
        results: list[SpecialistResult] = []
        for sid in action.specialist_ids:
            results.extend(
                self._do_call(
                    sid, self.current_task.enriched_description,
                    elapsed_ms, mode="PARALLEL",
                )
            )
        return results

    def _exec_fan_out_reduce(
        self, action: FactoredAction, elapsed_ms: float
    ) -> list[SpecialistResult]:
        """Fan-out: all specialists run independently; reduce: a synthesis pass
        merges all outputs into one recommendation. Highest quality, highest cost."""
        results = self._exec_parallel(action, elapsed_ms)
        successful_outs = [
            r.output for r in results
            if r.status == SpecialistStatus.SUCCESS and r.output
        ]
        if len(successful_outs) >= 2:
            synthesis = self._synthesize_outputs(successful_outs)
            synth = SpecialistResult(
                specialist_id="synthesizer",
                status=SpecialistStatus.SUCCESS,
                output=synthesis,
                latency_ms=0.0,
            )
            self.scratchpad.write(
                author_id="synthesizer",
                author_role="Synthesis Mediator",
                content=synthesis,
            )
            results.append(synth)
        return results

    def _exec_iterative(
        self, action: FactoredAction, elapsed_ms: float
    ) -> list[SpecialistResult]:
        """Repeatedly call one specialist, feeding its output back as context,
        until quality threshold met or max_rounds exhausted."""
        if not action.specialist_ids:
            return []
        sid        = action.specialist_ids[0]
        max_rounds = int(action.mode_params.get("max_rounds", 3))
        threshold  = float(action.mode_params.get("quality_threshold", 0.70))
        results: list[SpecialistResult] = []
        context = ""
        for _ in range(max(1, max_rounds)):
            batch = self._do_call(
                sid, self.current_task.enriched_description,
                elapsed_ms, mode="ITERATIVE",
                context=context or None,
            )
            results.extend(batch)
            for r in batch:
                if r.output:
                    if self._quick_quality_score(r.output, self.current_task.enriched_description) >= threshold:
                        return results
                    context = r.output
        return results

    def _exec_conditional(
        self, action: FactoredAction, elapsed_ms: float
    ) -> list[SpecialistResult]:
        """Call specialists in order; stop as soon as one meets the quality
        threshold β€” avoids unnecessary calls when the first is sufficient."""
        threshold = float(action.mode_params.get("condition_threshold", 0.60))
        results: list[SpecialistResult] = []
        for sid in action.specialist_ids:
            batch = self._do_call(
                sid, self.current_task.enriched_description,
                elapsed_ms, mode="CONDITIONAL",
            )
            results.extend(batch)
            for r in batch:
                if r.output and self._quick_quality_score(
                    r.output, self.current_task.enriched_description
                ) >= threshold:
                    return results
        return results

    def _exec_priority_queue(
        self, action: FactoredAction, elapsed_ms: float
    ) -> list[SpecialistResult]:
        """Sort selected specialists by task-similarity, call highest-ranked first,
        stop when output quality meets stop_threshold. Good for SLA-sensitive tasks."""
        threshold = float(action.mode_params.get("stop_threshold", 0.70))
        task_emb  = self.registry.embed_query(self.current_task.enriched_description)
        sorted_sids = sorted(
            [sid for sid in action.specialist_ids if self._can_call(sid)],
            key=lambda s: (
                self.registry.cosine_similarity(
                    task_emb, self.registry.get(s).to_state_vector()
                ) if s in self.registry.list_ids() else 0.0
            ),
            reverse=True,
        )
        results: list[SpecialistResult] = []
        for sid in sorted_sids:
            batch = self._do_call(
                sid, self.current_task.enriched_description,
                elapsed_ms, mode="PRIORITY_QUEUE",
            )
            results.extend(batch)
            for r in batch:
                if r.output and self._quick_quality_score(
                    r.output, self.current_task.enriched_description
                ) >= threshold:
                    return results
        return results

    def _exec_broadcast(
        self, action: FactoredAction, elapsed_ms: float
    ) -> list[SpecialistResult]:
        """Call all specialists independently, return only the single best result.
        Trades extra API calls for a quality ceiling guarantee."""
        results = self._exec_parallel(action, elapsed_ms)
        successful = [
            r for r in results
            if r.status == SpecialistStatus.SUCCESS and r.output
        ]
        if not successful:
            return results
        best = max(
            successful,
            key=lambda r: self._quick_quality_score(
                r.output, self.current_task.enriched_description
            ),
        )
        self.scratchpad.write(
            author_id=best.specialist_id,
            author_role=(
                self.registry.get(best.specialist_id).role
                if best.specialist_id in self.registry.list_ids() else "Specialist"
            ),
            content=f"[BROADCAST WINNER]\n{best.output}",
        )
        return [best]

    # ── MetaAction handlers ───────────────────────────────────────────

    def _exec_meta_mediator(
        self, action: FactoredAction, elapsed_ms: float
    ) -> list[SpecialistResult]:
        """Synthesise all current specialist_results to resolve conflicts.
        Only meaningful after β‰₯2 specialist outputs exist this episode."""
        outputs = [
            r.output for r in self.specialist_results
            if r.status == SpecialistStatus.SUCCESS and r.output
        ]
        if len(outputs) < 2:
            return []
        synthesis = self._synthesize_outputs(outputs)
        result = SpecialistResult(
            specialist_id="mediator",
            status=SpecialistStatus.SUCCESS,
            output=synthesis,
            latency_ms=0.0,
        )
        self.scratchpad.write(
            author_id="mediator", author_role="Conflict Mediator", content=synthesis
        )
        return [result]

    def _exec_meta_clarify(
        self, action: FactoredAction, elapsed_ms: float
    ) -> list[SpecialistResult]:
        """Enrich the current task description (via LLM when key available).
        All future specialist calls in this episode see the richer description."""
        import os
        original = self.current_task.enriched_description
        if os.getenv("OPENAI_API_KEY"):
            try:
                from openai import OpenAI
                client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
                resp = client.chat.completions.create(
                    model="gpt-4o-mini", max_tokens=250,
                    messages=[
                        {"role": "system", "content":
                            "Expand this task into a more specific, actionable description. "
                            "Add missing technical context. Keep it under 3 sentences."},
                        {"role": "user", "content": original[:500]},
                    ],
                )
                clarified = resp.choices[0].message.content.strip()
            except Exception as exc:
                print(f"[ClarifyTask] {exc}")
                clarified = original + " [Clarified: requires structured design and domain-specific approach]"
        else:
            clarified = (
                original + " [Clarified: requires structured design, "
                "clear acceptance criteria, and a domain-specific technical approach]"
            )
        self.current_task = type(self.current_task)(
            original_description=self.current_task.original_description,
            enriched_description=clarified,
            complexity_class=self.current_task.complexity_class,
            expected_specialists=self.current_task.expected_specialists,
            domain_hints=self.current_task.domain_hints,
            is_ambiguous=False,
            autonomously_enriched=True,
        )
        self.scratchpad.write(
            author_id="orchestrator", author_role="Orchestrator",
            content=f"Task clarified: {clarified[:300]}",
        )
        self._task_emb = self.registry.embed_query(clarified)
        return []  # effect is through improved quality on future specialist calls

    def _exec_meta_delegate_subtask(
        self, action: FactoredAction, elapsed_ms: float
    ) -> list[SpecialistResult]:
        """Decompose the task into 2–3 subtasks and route each to the best-matching
        sub-specialist, with the lead specialist as the DAG caller (depth 1β†’2).

        This is the only execution path that produces depth > 1 in the delegation
        graph.  The first specialist in action.specialist_ids acts as the delegating
        node; its sub-calls are recorded as specialist β†’ sub-specialist edges so
        self.delegation_graph.depth reaches 2 when max_depth=2 permits it.
        """
        import os, json
        task = self.current_task.enriched_description

        # ── Step 1: call the lead specialist at depth 1 (orchestrator β†’ lead) ──
        lead_id = next(
            (sid for sid in action.specialist_ids if self._can_call(sid, "orchestrator")),
            None,
        )
        results: list[SpecialistResult] = []
        if lead_id:
            results.extend(self._do_call(lead_id, task, elapsed_ms,
                                         mode="DELEGATE_SUBTASK", caller_id="orchestrator"))
        # If no lead could be called, fall through to sequential
        if not lead_id:
            return self._exec_sequential(action, elapsed_ms)

        # ── Step 2: decompose into subtasks ──
        subtasks: list[str] = []
        if os.getenv("OPENAI_API_KEY"):
            try:
                from openai import OpenAI
                client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
                resp = client.chat.completions.create(
                    model="gpt-4o-mini", max_tokens=250,
                    response_format={"type": "json_object"},
                    messages=[
                        {"role": "system", "content":
                            "Break this task into 2-3 distinct subtasks. "
                            "Return JSON: {\"subtasks\": [\"subtask1\", ...]}"},
                        {"role": "user", "content": task[:500]},
                    ],
                )
                subtasks = json.loads(resp.choices[0].message.content).get("subtasks", [])[:3]
            except Exception as exc:
                print(f"[DelegateSubtask] {exc}")
        if not subtasks:
            subtasks = [
                f"{task[:200]} β€” part 1: design and requirements",
                f"{task[:200]} β€” part 2: implementation and validation",
            ]

        # ── Step 3: route each subtask from lead_id (depth 1 β†’ 2) ──
        for subtask in subtasks:
            sub_emb = self.registry.embed_query(subtask)
            for sid, _ in self.registry.find_most_similar(sub_emb, top_k=self.max_specialists):
                if self._can_call(sid, caller_id=lead_id):
                    results.extend(self._do_call(sid, subtask, elapsed_ms,
                                                 mode="DELEGATE_SUBTASK", caller_id=lead_id))
                    break
        return results

    def _exec_meta_retry(
        self, action: FactoredAction, elapsed_ms: float
    ) -> list[SpecialistResult]:
        """Retry all failed/timed-out specialist calls using the FallbackChainResolver."""
        failed = [r for r in self.specialist_results if r.status != SpecialistStatus.SUCCESS]
        if not failed:
            return []
        results: list[SpecialistResult] = []
        for fr in failed:
            fb_id = self.fallback_resolver.get_fallback(fr.specialist_id, self.called_ids)
            if fb_id and self._can_call(fb_id):
                batch = self._do_call(
                    fb_id, self.current_task.enriched_description,
                    elapsed_ms, mode="RETRY_FAILED",
                )
                for r in batch:
                    r.fallback_used = True
                results.extend(batch)
        return results

    def _exec_meta_parallel_spawn(
        self, action: FactoredAction, elapsed_ms: float
    ) -> list[SpecialistResult]:
        """Spawn all selected specialists in parallel (delegates to PARALLEL mode)."""
        return self._exec_parallel(action, elapsed_ms)

    # ── Roster management ─────────────────────────────────────────────

    def _select_active_specialists(self, task_emb: np.ndarray) -> list[str]:
        """
        Pick the max_specialists agents most relevant to this task.
        Always ensures any specialist spawned this episode is in the set.
        """
        ranked = self.registry.find_most_similar(
            task_emb, top_k=self.registry.size
        )
        selected = [sid for sid, _ in ranked[: self.max_specialists]]

        # Guarantee newly spawned specialists are in the active window
        for sid in self.spawned_this_episode:
            if sid not in selected:
                selected[-1] = sid  # replace least-relevant

        return selected

    def _exec_meta_spawn_specialist(
        self, action: FactoredAction, elapsed_ms: float
    ) -> list[SpecialistResult]:
        """
        Policy-triggered specialist spawn.
        Guards: OPENAI_API_KEY required, cooldown and total cap enforced.
        After a successful spawn the active roster and action decoder are
        refreshed so the new specialist is immediately selectable.
        """
        import os
        task_desc = self.current_task.enriched_description

        # Guard: no API key
        if not os.getenv("OPENAI_API_KEY"):
            return []

        # Guard: total cap
        if self._spawn_total_count >= self._spawn_max_total:
            return []

        # Guard: cooldown
        episodes_since_last = self._episode_index - self._last_spawn_episode
        if episodes_since_last < self._spawn_cooldown_episodes:
            return []

        # All guards passed β€” attempt spawn
        prev_count = self._spawn_total_count
        top1 = self.registry.find_most_similar(self._task_emb, top_k=1)
        best_id  = top1[0][0] if top1 else ""
        best_sim = top1[0][1] if top1 else 0.0
        self._spawn_via_llm(task_desc, best_sim=best_sim, best_id=best_id)

        if self._spawn_total_count > prev_count:
            new_id = self.spawned_this_episode[-1]
            # Refresh active roster so the new specialist is immediately reachable
            self.active_specialist_ids = self._select_active_specialists(self._task_emb)
            self.action_decoder = ActionDecoder(
                specialist_ids=self.active_specialist_ids,
                max_specialists=self.max_specialists,
            )
            return [SpecialistResult(
                specialist_id=new_id,
                status=SpecialistStatus.SUCCESS,
                output=f"[SpawnSpecialist] Spawned '{new_id}' successfully.",
                latency_ms=0.0,
            )]
        else:
            return [SpecialistResult(
                specialist_id="spawn_attempt",
                status=SpecialistStatus.ERROR,
                output="[SpawnSpecialist] LLM spawn failed β€” see logs.",
                latency_ms=0.0,
            )]

    def _maybe_spawn_specialist(
        self, task_emb: np.ndarray, task: str
    ) -> None:
        """
        Auto-spawn a new specialist via LLM when the best existing match
        falls below spawn_threshold.  Skipped when no OPENAI_API_KEY.
        """
        top1 = self.registry.find_most_similar(task_emb, top_k=1)
        if not top1:
            return
        best_id, best_sim = top1[0]
        if best_sim >= self.spawn_threshold:
            return  # roster already covers the task well enough
        self._spawn_via_llm(task, best_sim, best_id)

    def _spawn_via_llm(
        self, task: str, best_sim: float, best_id: str
    ) -> None:
        """
        Ask GPT-4o-mini to design a new specialist for this task,
        then add it to the registry so it enters the active roster.
        Conditions the prompt on past successful spawns for similar tasks.
        """
        import os, json
        existing_roles = [self.registry.get(s).role for s in self.registry.list_ids()]
        best_role = self.registry.get(best_id).role if best_id else "none"

        # Retrieve similar past successful spawns for RAG context
        min_reward = self.config.get("environment", {}).get("spawn_memory_min_reward", 0.0)
        past_spawns = self._spawn_memory.retrieve_similar(
            self._task_emb, top_k=3, min_reward=min_reward
        )
        past_context = ""
        if past_spawns:
            examples = "\n".join(
                f"- Role: {r.specialist_role}  |  "
                f"Desc: {r.specialist_desc[:150]}  |  "
                f"Reward: {r.episode_reward:.2f}"
                for r in past_spawns
            )
            past_context = (
                f"\n\nPast successful spawns for similar tasks:\n{examples}\n"
                "Use these as inspiration but create something distinct if needed."
            )

        try:
            from openai import OpenAI
            client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
            resp = client.chat.completions.create(
                model="gpt-4o-mini",
                max_tokens=350,
                response_format={"type": "json_object"},
                messages=[
                    {
                        "role": "system",
                        "content": (
                            "You design specialist agent definitions for a multi-agent "
                            "delegation system. Return valid JSON only."
                        ),
                    },
                    {
                        "role": "user",
                        "content": (
                            f"Task: {task[:400]}\n\n"
                            f"Existing specialists: {', '.join(existing_roles)}\n"
                            f"Best current match: {best_role} "
                            f"(cosine similarity {best_sim:.2f} β€” below threshold)."
                            f"{past_context}\n\n"
                            "Define a new specialist better suited to this task. "
                            "Return JSON with keys: id (snake_case), role (title case), "
                            "description (2–3 sentences of domain expertise), "
                            "complexity_affinity (list from [atomic,simple,moderate,complex,enterprise]), "
                            "avg_latency_ms (integer, 2000–8000)."
                        ),
                    },
                ],
            )
            data = json.loads(resp.choices[0].message.content)
            required = {"id", "role", "description", "complexity_affinity", "avg_latency_ms"}
            if not required.issubset(data):
                print(f"[SpawnSpecialist] Incomplete JSON: {data}")
                return
            # Deduplicate ID
            base_id = str(data["id"]).lower().replace(" ", "_")
            uid = base_id
            suffix = 2
            while uid in self.registry.list_ids():
                uid = f"{base_id}_v{suffix}"
                suffix += 1
            data["id"] = uid
            self.registry.add_specialist(data)
            self.spawned_this_episode.append(uid)
            self._spawn_total_count += 1
            self._last_spawn_episode = self._episode_index
            print(
                f"[SpawnSpecialist] Created '{data['role']}' (id={uid}) "
                f"for task (best_sim was {best_sim:.2f}, "
                f"total spawned={self._spawn_total_count}/{self._spawn_max_total})"
            )
            # Stage a pending spawn record β€” reward filled in at episode end
            self._pending_spawn_records.append(SpawnRecord(
                task_embedding=self._task_emb.tolist(),
                task_description=task,
                specialist_id=uid,
                specialist_role=data["role"],
                specialist_desc=data["description"],
                episode_reward=0.0,    # filled in at episode end
                pre_spawn_sim=best_sim,
                post_spawn_sim=0.0,    # filled after re-ranking
                episode_idx=self._episode_index,
            ))
        except Exception as exc:
            print(f"[SpawnSpecialist] Failed: {exc}")

    # ── Specialist execution ───────────────────────────────────────────

    def _call_specialist(
        self, specialist_id: str, task: str, elapsed_ms: float,
        context: str | None = None,
    ) -> SpecialistResult:
        """
        Call a specialist.
        Priority order:
          1. use_real_spindleflow=True  β†’ TypeScript SpindleFlow subprocess
          2. OPENAI_API_KEY set         β†’ real OpenAI call per specialist
          3. neither                    β†’ fast simulation (training / offline)

        context: optional accumulated output from prior specialists (SEQUENTIAL/ITERATIVE).
        """
        import os
        specialist = self.registry.get(specialist_id)

        if self.use_real_spindleflow:
            output, latency, status = self._call_real_spindleflow(specialist_id, task)
        elif os.getenv("OPENAI_API_KEY") and not self.simulate_specialists:
            output, latency, status = self._call_openai_specialist(specialist_id, task, context=context)
        else:
            output  = self._simulate_specialist_output(specialist_id, task, context=context)
            latency = specialist.avg_latency_ms + np.random.normal(0, 500)
            status  = SpecialistStatus.SUCCESS

        return SpecialistResult(
            specialist_id=specialist_id,
            status=status,
            output=output,
            latency_ms=max(0, latency),
        )

    def _call_openai_specialist(
        self, specialist_id: str, task: str,
        context: str | None = None,
    ) -> tuple[str, float, SpecialistStatus]:
        """Call GPT-4o-mini acting as this specialist. Each gets its own system prompt.

        context: prior specialist output (SEQUENTIAL/ITERATIVE). When present, injected
                 as a user/assistant exchange before the current task so the model builds
                 on accumulated analysis rather than starting fresh.
        """
        import os
        specialist = self.registry.get(specialist_id)
        start = time.time()
        try:
            from openai import OpenAI
            client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
            if specialist.system_prompt:
                system_content = specialist.system_prompt
            else:
                system_content = (
                    f"You are a {specialist.role}. {specialist.description} "
                    f"Give a focused, expert response relevant to your specialty."
                )
            messages = [{"role": "system", "content": system_content}]
            if context:
                messages.append({
                    "role": "user",
                    "content": f"Prior specialist analysis:\n{context[:600]}",
                })
                messages.append({
                    "role": "assistant",
                    "content": "Understood. I'll build on this prior analysis.",
                })
            messages.append({"role": "user", "content": f"Task: {task[:600]}"})
            response = client.chat.completions.create(
                model="gpt-4o-mini",
                max_tokens=600,
                messages=messages,
            )
            latency = (time.time() - start) * 1000
            return response.choices[0].message.content, latency, SpecialistStatus.SUCCESS
        except Exception as exc:
            latency = (time.time() - start) * 1000
            print(f"[OpenAI specialist {specialist_id}] Error: {exc}")
            return "", latency, SpecialistStatus.ERROR

    def _simulate_specialist_output(
        self, specialist_id: str, task: str,
        context: str | None = None,
    ) -> str:
        """
        Simulate specialist output for training (no API key).

        Critically: the task text is NOT embedded in the output.
        Output quality is driven entirely by domain vocabulary from the
        specialist description, which naturally correlates with the task
        embedding when the specialist is a good match. This gives T1
        quality_delta a real signal (specialist–task domain overlap)
        rather than the degenerate case where both sides quote task[:100]
        and collapse quality_delta to noise.

        context: prior specialist output (SEQUENTIAL/ITERATIVE). When present and
                 similarity is high, the output acknowledges and extends prior work.

        Three quality tiers based on specialist-task cosine similarity:
          > 0.45  β†’ rich domain analysis (high T1 score if relevant)
          > 0.25  β†’ partial domain guidance
          ≀ 0.25  β†’ mismatched β€” minimal domain content (low T1 score)
        """
        specialist = self.registry.get(specialist_id)
        task_emb = self.registry.embed_query(task)
        spec_emb = specialist.to_state_vector()
        similarity = self.registry.cosine_similarity(task_emb, spec_emb)

        context_prefix = ""
        if context and similarity > 0.45:
            context_prefix = (
                f"Building on the prior analysis, I will extend with {specialist.role.lower()} "
                f"expertise.\n"
            )

        if similarity > 0.45:
            return (
                f"{context_prefix}As a {specialist.role}, here is my expert analysis.\n"
                f"{specialist.description}\n"
                f"Key technical considerations from this domain: systematic design, "
                f"stakeholder alignment, iterative validation, and rigorous testing. "
                f"I recommend applying established {specialist.role.lower()} frameworks "
                f"with particular attention to quality gates and domain-specific constraints."
            )
        elif similarity > 0.25:
            return (
                f"As a {specialist.role}, I can provide partial guidance. "
                f"My expertise: {specialist.description[:200]}. "
                f"For aspects outside my specialty, additional expert input is recommended."
            )
        else:
            return (
                f"As a {specialist.role}, this request falls largely outside my primary domain. "
                f"I can offer only general guidance and recommend a more suitable specialist."
            )

    def _call_real_spindleflow(
        self, specialist_id: str, task: str
    ) -> tuple[str, float, SpecialistStatus]:
        """
        Call the real SpindleFlow TypeScript backend via subprocess.
        Returns (output, latency_ms, status).
        """
        import subprocess
        import json
        import os
        import tempfile

        spindleflow_path = os.getenv("SPINDLEFLOW_PATH", "../SpindleFlow")
        specialist = self.registry.get(specialist_id)

        config = {
            "models": {
                "gemini": {
                    "provider": "gemini",
                    "model": "gemini-2.5-flash-lite",
                    "max_tokens": 4096,
                }
            },
            "provider": "gemini",
            "agents": [{
                "id": specialist_id,
                "role": specialist.role,
                "goal": specialist.description,
            }],
            "workflow": {
                "type": "sequential",
                "steps": [{"agent": specialist_id}],
            },
        }

        with tempfile.NamedTemporaryFile(
            mode="w", suffix=".yml", delete=False
        ) as f:
            yaml.dump(config, f)
            config_path = f.name

        start = time.time()
        try:
            result = subprocess.run(
                ["npm", "run", "dev", "--", "run", config_path, "-i", task[:500]],
                cwd=spindleflow_path,
                capture_output=True,
                text=True,
                timeout=self.specialist_timeout_ms / 1000,
            )
            latency = (time.time() - start) * 1000
            if result.returncode == 0:
                output = result.stdout[-2000:]
                return output, latency, SpecialistStatus.SUCCESS
            else:
                return "", latency, SpecialistStatus.ERROR
        except subprocess.TimeoutExpired:
            latency = (time.time() - start) * 1000
            return "", latency, SpecialistStatus.TIMEOUT
        finally:
            try:
                os.unlink(config_path)
            except Exception:
                pass

    def _generate_generalist_baseline(self, task: str) -> str:
        """
        Generate a generalist (non-specialist) response to the task.
        Uses OpenAI when OPENAI_API_KEY is set (regardless of use_real_spindleflow).
        Falls back to a simulated template when no key is available.
        """
        import os
        if getattr(self, "simulate_specialists", False) or not os.getenv("OPENAI_API_KEY"):
            return (
                "General problem-solving approach:\n"
                "1. Gather and clarify requirements\n"
                "2. Research common solution patterns\n"
                "3. Draft a high-level architecture\n"
                "4. Implement in small, testable increments\n"
                "5. Validate against acceptance criteria and deploy\n"
                "No specialist domain expertise applied."
            )
        api_key = os.getenv("OPENAI_API_KEY")
        if api_key:
            try:
                from openai import OpenAI
                client = OpenAI(api_key=api_key)
                response = client.chat.completions.create(
                    model="gpt-4o-mini",
                    max_tokens=500,
                    messages=[{"role": "user", "content": f"Please help with: {task}"}],
                )
                return response.choices[0].message.content
            except Exception as e:
                print(f"[Baseline] OpenAI error: {e}. Using simulated baseline.")
        # Simulation baseline: domain-neutral boilerplate, NO task text.
        # Must embed far from any specific task so quality_delta is positive
        # whenever a matched specialist contributes domain-relevant content.
        return (
            "General problem-solving approach:\n"
            "1. Gather and clarify requirements\n"
            "2. Research common solution patterns\n"
            "3. Draft a high-level architecture\n"
            "4. Implement in small, testable increments\n"
            "5. Validate against acceptance criteria and deploy\n"
            "No specialist domain expertise applied."
        )

    def _compute_step_reward(
        self,
        step_results: list[SpecialistResult],
        task_emb: np.ndarray,
        delegation_mode: "DelegationMode | None" = None,
        meta_action: "MetaAction | None" = None,
    ) -> float:
        """
        Per-step shaping reward for non-terminal steps.

        Base shaping:
          +0.02  per specialist whose cosine-sim with task > 0.35  (good routing)
          -0.01  per specialist below 0.20                          (mismatch)
          -0.01  per failed call

        Mode-specific adjustments (make mode choice matter before terminal reward):

          PARALLEL β€” specialists ran concurrently; effective wall-clock cost is
            max(latencies) not sum(latencies).  Reward the latency saving when
            β‰₯2 specialists ran: +0.01 * (1 - max_lat / sum_lat).
            E.g. 3 specialists Γ— 1 s each β†’ sum=3 s, max=1 s β†’ saving=0.67 β†’
            bonus β‰ˆ +0.0067.  Scales to zero when only one specialist runs.

          SEQUENTIAL β€” scratchpad-chaining means each specialist built on prior
            output.  Reward the coordination effort: +0.01 per specialist after
            the first one (they had real context to work with), capped at +0.03.

        Scale stays small vs terminal range [-1, 2] so episode quality_delta
        dominates.  Total step shaping over 10 steps tops out at ~0.25.
        """
        if not step_results or not self.current_task:
            self._last_reward_components = {"step_shaping": 0.0}
            return 0.0

        shaped = 0.0
        for result in step_results:
            if result.status != SpecialistStatus.SUCCESS:
                shaped -= 0.01
                continue
            if result.specialist_id not in self.registry.list_ids():
                continue
            spec_emb = self.registry.get(result.specialist_id).to_state_vector()
            sim = self.registry.cosine_similarity(task_emb, spec_emb)
            if sim > 0.35:
                shaped += 0.02
            elif sim < 0.20:
                shaped -= 0.01

        # Mode-specific bonus
        mode_bonus = 0.0
        successful = [r for r in step_results if r.status == SpecialistStatus.SUCCESS]
        if delegation_mode == DelegationMode.PARALLEL and len(successful) >= 2:
            latencies = [r.latency_ms for r in successful]
            sum_lat = sum(latencies)
            if sum_lat > 0:
                saving = 1.0 - max(latencies) / sum_lat
                mode_bonus = round(0.01 * saving, 4)
        elif delegation_mode == DelegationMode.SEQUENTIAL and len(successful) >= 2:
            # Each specialist after the first had chained context
            chained_count = len(successful) - 1
            mode_bonus = min(0.01 * chained_count, 0.03)

        shaped += mode_bonus

        # Spawn quality shaping β€” only on SPAWN_SPECIALIST steps
        spawn_bonus = 0.0
        if meta_action == MetaAction.SPAWN_SPECIALIST:
            spawn_succeeded = any(
                r.status == SpecialistStatus.SUCCESS
                and r.specialist_id in self.spawned_this_episode
                for r in step_results
            )
            if spawn_succeeded:
                new_id = self.spawned_this_episode[-1]
                try:
                    new_spec_vec = self.registry.get(new_id).to_state_vector()
                    new_sim = float(self.registry.cosine_similarity(task_emb, new_spec_vec))
                    # Reward coverage gap closed above threshold; penalise redundant spawns
                    spawn_bonus = round(0.05 * max(0.0, new_sim - self.spawn_threshold), 4)
                except Exception:
                    spawn_bonus = 0.0
            else:
                # Guard hit or LLM failed β€” mild penalty to discourage wasteful spawn attempts
                spawn_bonus = -0.02

        shaped += spawn_bonus
        self._last_reward_components = {
            "step_shaping": float(shaped),
            "mode_bonus": float(mode_bonus),
            "spawn_bonus": float(spawn_bonus),
        }
        return float(shaped)

    def _compute_final_reward(self, elapsed_ms: float) -> float:
        """Compute the full reward for a completed episode."""
        _zero = {k: 0.0 for k in [
            "quality_delta", "efficiency_penalty", "failure_penalty",
            "recovery_bonus", "conflict_penalty", "conflict_bonus",
            "consistency_bonus", "latency_penalty", "explanation_bonus",
        ]}
        if not self.specialist_results or not self.current_task:
            self._last_reward_components = {**_zero, "failure_penalty": -0.1}
            return -0.1

        successful_outputs = [
            r.output for r in self.specialist_results
            if r.status == SpecialistStatus.SUCCESS and r.output
        ]

        if not successful_outputs:
            self._last_reward_components = {**_zero, "failure_penalty": -0.2}
            return -0.2

        specialist_output = "\n\n".join(successful_outputs)
        task_desc = self.current_task.enriched_description

        # Delta reward β€” same tier for both
        specialist_score = self.reward_scorer.score(
            specialist_output, task_desc, self.tier_lock
        )
        baseline_score = self.reward_scorer.score(
            self.generalist_baseline, task_desc, self.tier_lock
        )
        quality_delta = specialist_score - baseline_score

        # Efficiency penalty
        n = len(self.called_ids)
        expected = self.current_task.expected_specialists
        efficiency_penalty = self.config_reward["efficiency_base_penalty"] * \
                             max(0, n - expected)

        # Failure signals
        failure_penalty = compute_failure_penalty(self.specialist_results)
        recovery_bonus = compute_recovery_bonus(
            self.specialist_results, episode_completed=True
        )

        # Conflict signals
        conflicts = detect_conflicts(
            self.specialist_results,
            registry=self.registry,
            contradiction_pairs=self._contradiction_pairs,
            similarity_threshold=self.config_reward.get(
                "conflict_similarity_threshold", 0.25
            ),
        )
        if conflicts:
            self.conflict_resolver.resolve_all(conflicts, self.specialist_results)
        conflict_penalty = self.config_reward["conflict_unresolved_penalty"] * \
                          len([c for c in conflicts if not c.resolved])
        conflict_bonus = self.config_reward["conflict_resolved_bonus"] * \
                        len([c for c in conflicts if c.resolved])

        # Consistency bonus
        path = self.delegation_graph.get_delegation_path()
        consistency = self.consistency_tracker.consistency_score(
            path, self.current_task.complexity_class
        )
        consistency_bonus = self.config_reward["consistency_bonus_weight"] * consistency

        # Latency penalty
        latency_penalty = compute_latency_penalty(elapsed_ms, self.latency_sla)

        # Explanation bonus
        explanation_bonus = (
            self.config_reward["explanation_bonus"]
            if self.delegation_graph.is_auditable()
            else 0.0
        )

        self.consistency_tracker.record_path(
            self.current_task.complexity_class, path
        )

        total_reward = (
            quality_delta
            - efficiency_penalty
            - failure_penalty
            + recovery_bonus
            - conflict_penalty
            + conflict_bonus
            + consistency_bonus
            - latency_penalty
            + explanation_bonus
        )

        self._last_reward_components = {
            "quality_delta":      float(quality_delta),
            "efficiency_penalty": float(-efficiency_penalty),
            "failure_penalty":    float(-failure_penalty),
            "recovery_bonus":     float(recovery_bonus),
            "conflict_penalty":   float(-conflict_penalty),
            "conflict_bonus":     float(conflict_bonus),
            "consistency_bonus":  float(consistency_bonus),
            "latency_penalty":    float(-latency_penalty),
            "explanation_bonus":  float(explanation_bonus),
        }

        total_reward_clipped = float(np.clip(total_reward, -1.0, 2.0))

        # Record conflict resolution outcomes so the bandit can learn
        self.conflict_resolver.record_episode_outcome(
            quality_delta=float(quality_delta),
            episode_idx=self._episode_index,
        )

        # Finalise pending spawn records with the actual episode reward
        if self._pending_spawn_records and self._task_emb is not None:
            top_post = self.registry.find_most_similar(self._task_emb, top_k=1)
            post_sim = top_post[0][1] if top_post else 0.0
            for rec in self._pending_spawn_records:
                rec.episode_reward = total_reward_clipped
                rec.post_spawn_sim = post_sim
                self._spawn_memory.record(rec)
            self._pending_spawn_records = []

        return total_reward_clipped

    def _record_episode_to_memory(self, episode_reward: float) -> None:
        """Record each specialist's output and the episode reward to SpecialistMemory."""
        if not self.current_task:
            return
        task_desc = self.current_task.enriched_description
        for result in self.specialist_results:
            if result.specialist_id in self.spawned_this_episode:
                continue   # skip spawn confirmation messages
            if result.status == SpecialistStatus.SUCCESS and result.output:
                self.specialist_memory.record(
                    specialist_id=result.specialist_id,
                    task=task_desc,
                    output=result.output,
                    reward=episode_reward,
                )

    def render(self) -> None:
        if self.render_mode == "human" and self.current_task:
            print(f"\n[Episode State]")
            print(f"  Task: {self.current_task.enriched_description[:80]}")
            print(f"  Step: {self.step_count}/{self.max_steps}")
            print(f"  Called: {self.called_ids}")
            print(f"  Depth: {self.delegation_graph.depth}")

    def close(self) -> None:
        pass