File size: 40,778 Bytes
26bf1c9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
"""

Core Ad Fraud Investigation Environment (Investigator role).



Implements the OpenEnv Environment interface for the Investigator agent:

reviewing a queue of ads, investigating them, and rendering verdicts under

a budget constraint.



In Round 1 this environment is used standalone (`AdFraudEnvironment` alias

preserves backwards compatibility).  In Round 2 it is driven by the

`RefereeEnvironment`, which pre-generates episodes and supplies a shared

`InvestigationToolRegistry` so Fraudster-proposed ads are reachable through

the same investigation code path.

"""

from __future__ import annotations

import logging
from typing import Any, Dict, List, Optional
from uuid import uuid4

from openenv.core.env_server.interfaces import Environment

try:
    from ..data.ad_generator import (
        TASK_CONFIGS,
        Ad,
        GeneratedEpisode,
        generate_episode,
    )
    from ..data.tool_registry import InvestigationToolRegistry
    from ..models import AdFraudState, AdReviewAction, AdReviewObservation
    from ..graders.base_grader import EpisodeRecord, LinkResult, VerdictResult, grade_episode
    from .evidence_ledger import build_evidence_ledger
except ImportError:
    from data.ad_generator import (
        TASK_CONFIGS,
        Ad,
        GeneratedEpisode,
        generate_episode,
    )
    from data.tool_registry import InvestigationToolRegistry
    from models import AdFraudState, AdReviewAction, AdReviewObservation
    from graders.base_grader import EpisodeRecord, LinkResult, VerdictResult, grade_episode
    from server.evidence_ledger import build_evidence_ledger

logger = logging.getLogger(__name__)

# Module-level store so the /grader endpoint can read the last score.
_last_grader_result: Dict[str, Any] = {}


def get_last_grader_result() -> Dict[str, Any]:
    return dict(_last_grader_result)


class InvestigatorEnvironment(
    Environment[AdReviewAction, AdReviewObservation, AdFraudState]
):
    """

    Ad fraud investigation environment (Investigator role).



    Each episode is a review session: the agent processes a queue of N ads

    within a limited action budget, choosing what to investigate and when

    to render verdicts. Unreviewed ads auto-approve at episode end.



    `reset()` accepts optional `episode` and `registry` kwargs so the

    Referee (or a test harness) can inject a pre-built `GeneratedEpisode`

    plus a shared `InvestigationToolRegistry`.  Without them, the

    environment generates its own synthetic episode and a fresh registry

    (the Round 1 behaviour).

    """

    SUPPORTS_CONCURRENT_SESSIONS = True

    def __init__(self) -> None:
        super().__init__()
        self._state = AdFraudState(episode_id=str(uuid4()), step_count=0)
        self._episode: Optional[GeneratedEpisode] = None
        self._registry: Optional[InvestigationToolRegistry] = None
        self._verdicts: Dict[str, Dict[str, Any]] = {}
        self._links: List[Dict[str, Any]] = []
        self._investigations: Dict[str, List[str]] = {}
        # Total `investigate` attempts per ad β€” INCLUDING ones that the
        # env rejects (duplicate target, hit cap, etc.).

        self._investigation_attempts: Dict[str, int] = {}
        self._cumulative_reward: float = 0.0
        self._done = False
        self._last_feedback = ""
        self._focused_ad_id: Optional[str] = None
        self._queue_may_grow: bool = False

    # ------------------------------------------------------------------
    # OpenEnv interface
    # ------------------------------------------------------------------

    def reset(

        self,

        seed: int | None = None,

        episode_id: str | None = None,

        **kwargs: Any,

    ) -> AdReviewObservation:
        task_id = kwargs.get("task_id", "task_1")
        if task_id not in TASK_CONFIGS:
            task_id = "task_1"

        injected_episode: Optional[GeneratedEpisode] = kwargs.get("episode")
        injected_registry: Optional[InvestigationToolRegistry] = kwargs.get("registry")
        self._queue_may_grow = bool(kwargs.get("queue_may_grow", False))

        if injected_episode is not None:
            self._episode = injected_episode
        else:
            effective_seed = (
                seed if seed is not None else hash(uuid4()) & 0xFFFFFFFF
            )
            self._episode = generate_episode(effective_seed, task_id)

        if injected_registry is not None:
            self._registry = injected_registry
        else:
            self._registry = InvestigationToolRegistry.from_episode(self._episode)

        config = self._episode.task_config
        self._state = AdFraudState(
            episode_id=episode_id or str(uuid4()),
            step_count=0,
            task_id=task_id,
            total_ads=config.queue_size,
            reviewed_count=0,
            remaining_budget=config.action_budget,
            verdicts={},
            grader_score=None,
        )
        self._verdicts = {}
        self._links = []
        self._investigations = {}
        self._investigation_attempts = {}
        self._cumulative_reward = 0.0
        self._done = False
        self._last_feedback = "Episode started. Review the ad queue and begin your investigation."
        self._focused_ad_id = self._episode.ads[0].ad_id if self._episode.ads else None

        return self._build_observation(reward=0.0, done=False)

    def step(

        self,

        action: AdReviewAction,

        timeout_s: float | None = None,

        **kwargs: Any,

    ) -> AdReviewObservation:
        if self._done:
            return self._build_observation(
                reward=0.0, done=True,
                feedback_override="Episode is already complete. Call reset() to start a new episode.",
            )

        if self._episode is None:
            return self._build_observation(
                reward=0.0, done=False,
                feedback_override="Environment not initialized. Call reset() first.",
            )

        self._state.step_count += 1

        ad_ids = {a.ad_id for a in self._episode.ads}
        if action.ad_id not in ad_ids:
            self._last_feedback = f"Invalid ad_id '{action.ad_id}'. Valid IDs: {', '.join(sorted(ad_ids))}"
            return self._build_observation(reward=-0.05, done=False)

        if action.action_type == "investigate":
            reward = self._handle_investigate(action)
        elif action.action_type == "verdict":
            reward = self._handle_verdict(action)
        elif action.action_type == "link_accounts":
            reward = self._handle_link(action)
        else:
            self._last_feedback = f"Unknown action_type '{action.action_type}'."
            reward = -0.05

        self._cumulative_reward += reward

        done = self._check_done()
        if done and not self._done:
            end_reward = self._handle_episode_end()
            reward += end_reward
            self._cumulative_reward += end_reward
            self._done = True

        self._state.remaining_budget = max(0, self._state.remaining_budget)
        self._state.reviewed_count = len(self._verdicts)
        self._state.verdicts = {
            ad_id: v.get("verdict", "") for ad_id, v in self._verdicts.items()
        }

        return self._build_observation(reward=reward, done=self._done)

    @property
    def state(self) -> AdFraudState:
        return self._state

    # ------------------------------------------------------------------
    # Action handlers
    # ------------------------------------------------------------------

    # Hard cap on TOTAL investigate attempts per ad β€” including the ones
    # the env rejects (duplicate target, ad already verdicted, etc.).
    
    _MAX_INVESTIGATION_ATTEMPTS_PER_AD = 5
    _MAX_INVESTIGATIONS_PER_AD = 3

    def _rotate_focus_off(self, current_ad_id: str) -> Optional[str]:
        """Move focus to the next pending ad that is NOT ``current_ad_id``.



        Returns the new focus ad_id, or ``None`` if there is no other

        pending ad. Used by ``_handle_investigate`` when the model is

        stuck looping on a single ad β€” rotating focus changes the

        ``Ad in Focus`` block in the next observation and breaks the

        prompt-anchoring effect that's keeping the 1.5B Investigator

        glued to one ad.

        """
        if self._episode is None:
            return None
        pending_others = [
            a.ad_id for a in self._episode.ads
            if a.ad_id not in self._verdicts and a.ad_id != current_ad_id
        ]
        if pending_others:
            self._focused_ad_id = pending_others[0]
            return pending_others[0]
        return None

    def _handle_investigate(self, action: AdReviewAction) -> float:
        ad_id = action.ad_id
        # Count EVERY attempt, including the ones we're about to reject β€”
        # this is the primary loop-breaking signal.
        attempts = self._investigation_attempts.get(ad_id, 0) + 1
        self._investigation_attempts[ad_id] = attempts

        if self._state.remaining_budget <= 0:
            self._last_feedback = "No budget remaining. You must render verdicts on remaining ads or end the episode."
            return -0.02

        if action.investigation_target is None:
            self._last_feedback = "investigation_target is required for action_type='investigate'."
            return -0.05

        if ad_id in self._verdicts:
            new_focus = self._rotate_focus_off(ad_id)
            self._last_feedback = (
                f"You already rendered a verdict on {ad_id}. "
                + (
                    f"Focus moved to {new_focus} β€” investigate or verdict that ad next."
                    if new_focus else
                    "All other ads are also verdicted; emit verdict actions for any remaining pending ads."
                )
            )
            return -0.02

        # Hard cap on TOTAL attempts (catches the duplicate-target spam
        # loop). Fires BEFORE the duplicate-target check so we hit the
        # cap first when the model is just spamming the same target.
        if attempts > self._MAX_INVESTIGATION_ATTEMPTS_PER_AD:
            new_focus = self._rotate_focus_off(ad_id)
            done_targets = self._investigations.get(ad_id, [])
            self._last_feedback = (
                f"REJECTED: {ad_id} has been probed {attempts} times β€” that's the cap. "
                f"STOP TRYING TO INVESTIGATE {ad_id}. "
                f"Issue a verdict on {ad_id} NOW (action_type='verdict', verdict in "
                "{approve, reject, escalate}). "
                + (
                    f"Successful pulls on {ad_id} so far: {', '.join(done_targets) or 'none'}. "
                    if done_targets else ""
                )
                + (
                    f"Focus moved to {new_focus}; investigate that ad if you'd rather "
                    "build evidence on a fresh ad first."
                    if new_focus else
                    "No other pending ads β€” verdict the remaining pending ads."
                )
            )
            return -0.10

        prev = self._investigations.setdefault(ad_id, [])
        target = action.investigation_target

        if target in prev:
            # Duplicate-target case. Don't spend budget. Surface the
            # un-pulled targets so the model has a concrete next pick.
            allowed = {
                "advertiser_history", "landing_page", "payment_method",
                "targeting_overlap", "campaign_structure", "policy_classifier",
            }
            remaining = sorted(allowed - set(prev))
            self._last_feedback = (
                f"You already investigated '{target}' for {ad_id} "
                f"(attempt {attempts}/{self._MAX_INVESTIGATION_ATTEMPTS_PER_AD}). "
                f"Either issue a verdict on {ad_id} now, OR pick a fresh target from "
                f"[{', '.join(remaining) if remaining else '(none left β€” verdict it)'}], "
                "OR investigate a different ad_id from the pending queue."
            )
            return -0.02

        # Sub-cap on successful pulls (tighter than the attempt cap).
        # Forces verdict after 3 successful investigations.
        if len(prev) >= self._MAX_INVESTIGATIONS_PER_AD:
            new_focus = self._rotate_focus_off(ad_id)
            self._last_feedback = (
                f"{ad_id} has reached the {self._MAX_INVESTIGATIONS_PER_AD}-investigation cap "
                f"(already pulled: {', '.join(prev)}). Issue a verdict on {ad_id} now "
                "(action_type='verdict' with verdict in {approve, reject, escalate})."
                + (f" Focus moved to {new_focus}." if new_focus else "")
            )
            return -0.05

        self._state.remaining_budget -= 1
        prev.append(target)
        # Note: focus is updated to the ad we just investigated only when
        # the Investigator hasn't already accumulated >=2 investigations
        # on it. Past 2 investigations on the same ad we keep focus on
        # the EXISTING focused ad (which may be a different one) so the
        # prompt doesn't re-anchor the model to an ad it should be
        # rendering a verdict on, NOT investigating further. This nudges
        # the policy toward "investigate twice β†’ verdict β†’ move on"
        # instead of looping investigations on a single ad.
        if len(prev) <= 2 or self._focused_ad_id is None:
            self._focused_ad_id = ad_id

        if self._registry is not None:
            findings = self._registry.lookup(ad_id, target)
        else:
            findings = self._episode.investigation_data.get(ad_id, {}).get(
                target, "No data available for this investigation type."
            )
        # Escalating per-investigate penalty so a runaway investigate
        # loop on a single ad gets progressively more expensive β€” pushes
        # the policy toward issuing a verdict once it has enough signal,
        # rather than burning steps re-checking the same ad.
        n_targets = len(prev)
        if n_targets <= 2:
            penalty = -0.02
        elif n_targets == 3:
            penalty = -0.05
        else:
            penalty = -0.10
        feedback_lines = [
            f"Investigation complete: {target} for {ad_id}.",
            f"--- Findings ---\n{findings}",
        ]
        if n_targets >= 2:
            feedback_lines.append(
                f"Note: you have now investigated {ad_id} {n_targets}x β€” "
                "issue a verdict on it instead of more investigations."
            )
        self._last_feedback = "\n".join(feedback_lines)
        return penalty

    def _handle_verdict(self, action: AdReviewAction) -> float:
        ad_id = action.ad_id

        if ad_id in self._verdicts:
            self._last_feedback = f"You already rendered a verdict on {ad_id}."
            return -0.02

        if action.verdict is None:
            self._last_feedback = "verdict field is required for action_type='verdict'."
            return -0.05

        confidence = action.confidence if action.confidence is not None else 0.5
        ad = self._get_ad(ad_id)
        ground_truth = ad.ground_truth_label if ad else "legit"
        severity = ad.severity if ad else 0.0

        self._verdicts[ad_id] = {
            "verdict": action.verdict,
            "confidence": confidence,
            "ground_truth": ground_truth,
        }

        reward = self._compute_verdict_reward(action.verdict, ground_truth, severity, confidence)

        pending = [a.ad_id for a in self._episode.ads if a.ad_id not in self._verdicts]
        self._last_feedback = (
            f"Verdict recorded for {ad_id}: {action.verdict} "
            f"(confidence: {confidence:.2f}). "
            f"{len(pending)} ad(s) remaining in queue."
        )

        if pending:
            self._focused_ad_id = pending[0]

        return reward

    def _handle_link(self, action: AdReviewAction) -> float:
        if action.linked_ad_id is None:
            self._last_feedback = "linked_ad_id is required for action_type='link_accounts'."
            return -0.05

        ad_ids = {a.ad_id for a in self._episode.ads}
        if action.linked_ad_id not in ad_ids:
            self._last_feedback = f"Invalid linked_ad_id '{action.linked_ad_id}'."
            return -0.05

        if action.ad_id == action.linked_ad_id:
            self._last_feedback = "Cannot link an ad to itself."
            return -0.05

        link_key = tuple(sorted([action.ad_id, action.linked_ad_id]))
        existing = {tuple(sorted([l["ad_id_1"], l["ad_id_2"]])) for l in self._links}
        if link_key in existing:
            self._last_feedback = f"Link between {action.ad_id} and {action.linked_ad_id} already recorded."
            return -0.02

        is_correct = self._check_link_correct(action.ad_id, action.linked_ad_id)

        self._links.append({
            "ad_id_1": action.ad_id,
            "ad_id_2": action.linked_ad_id,
            "reason": action.link_reason or "",
            "correct": is_correct,
        })

        self._last_feedback = (
            f"Network link recorded: {action.ad_id} <-> {action.linked_ad_id}. "
            f"Reason: {action.link_reason or 'not specified'}."
        )

        return 0.4 if is_correct else -0.25

    # ------------------------------------------------------------------
    # Reward computation
    # ------------------------------------------------------------------

    def _compute_verdict_reward(

        self, verdict: str, ground_truth: str, severity: float, confidence: float

    ) -> float:
        if verdict == "reject" and ground_truth == "fraud":
            return 0.3 + 0.1 * severity
        elif verdict == "approve" and ground_truth == "legit":
            return 0.1
        elif verdict == "escalate" and ground_truth == "escalate":
            return 0.15
        elif verdict == "reject" and ground_truth == "legit":
            return -0.35
        elif verdict == "approve" and ground_truth == "fraud":
            return -0.5
        elif verdict == "escalate":
            return -0.05
        elif verdict == "approve" and ground_truth == "escalate":
            return -0.15
        elif verdict == "reject" and ground_truth == "escalate":
            return -0.1
        else:
            return -0.05

    def _handle_episode_end(self) -> float:
        """Apply end-of-episode adjustments for unreviewed ads, then delegate to graders."""
        unreviewed_fraud = 0
        for ad in self._episode.ads:
            if ad.ad_id not in self._verdicts:
                self._verdicts[ad.ad_id] = {
                    "verdict": "approve",
                    "confidence": 0.0,
                    "ground_truth": ad.ground_truth_label,
                    "auto_approved": True,
                }
                if ad.ground_truth_label == "fraud":
                    unreviewed_fraud += 1

        record = self._build_episode_record()
        grader_score = grade_episode(record)
        self._state.grader_score = grader_score

        reviewed_count = len([v for v in self._verdicts.values() if not v.get("auto_approved")])
        total_ads = len(self._episode.ads)
        total_correct = sum(
            1 for v in self._verdicts.values()
            if not v.get("auto_approved")
            and (
                (v["verdict"] == "reject" and v["ground_truth"] == "fraud")
                or (v["verdict"] == "approve" and v["ground_truth"] == "legit")
                or (v["verdict"] == "escalate" and v["ground_truth"] == "escalate")
            )
        )
        false_positives = sum(
            1 for v in self._verdicts.values()
            if not v.get("auto_approved")
            and v["verdict"] == "reject" and v["ground_truth"] == "legit"
        )
        false_negatives = sum(
            1 for v in self._verdicts.values()
            if not v.get("auto_approved")
            and v["verdict"] == "approve" and v["ground_truth"] == "fraud"
        )
        correct_links = sum(1 for l in self._links if l.get("correct"))
        incorrect_links = sum(1 for l in self._links if not l.get("correct"))

        global _last_grader_result
        _last_grader_result = {
            "task_id": self._state.task_id,
            "grader_score": grader_score,
            "episode_id": self._state.episode_id,
            "total_steps": self._state.step_count,
            "verdicts_rendered": reviewed_count,
            "correct_decisions": total_correct,
            "false_positives": false_positives,
            "false_negatives": false_negatives,
            "auto_approved": total_ads - reviewed_count,
            "unreviewed_fraud": unreviewed_fraud,
            "network_links_correct": correct_links,
            "network_links_incorrect": incorrect_links,
        }

        feedback_lines = [
            f"Episode complete. Grader score: {grader_score:.3f}/1.000",
            f"Verdicts rendered: {reviewed_count}/{total_ads}",
            f"Correct decisions: {total_correct}/{reviewed_count}",
            f"False positives (legit rejected): {false_positives}",
            f"False negatives (fraud approved): {false_negatives}",
            f"Unreviewed ads auto-approved: {unreviewed_fraud}",
        ]
        if self._links:
            feedback_lines.append(
                f"Network links: {correct_links} correct, {incorrect_links} incorrect"
            )
        self._last_feedback = "\n".join(feedback_lines)

        return 0.0

    def _build_episode_record(self) -> EpisodeRecord:
        """Convert internal state into an EpisodeRecord for the grader."""
        verdict_results = []
        for ad in self._episode.ads:
            v = self._verdicts.get(ad.ad_id)
            if v:
                verdict_results.append(VerdictResult(
                    ad_id=ad.ad_id,
                    verdict=v["verdict"],
                    confidence=v.get("confidence", 0.5),
                    ground_truth=v["ground_truth"],
                    auto_approved=v.get("auto_approved", False),
                ))

        link_results = [
            LinkResult(ad_id_1=l["ad_id_1"], ad_id_2=l["ad_id_2"], correct=l["correct"])
            for l in self._links
        ]

        ads_metadata = [
            {"ad_id": ad.ad_id, "ground_truth": ad.ground_truth_label, "severity": ad.severity}
            for ad in self._episode.ads
        ]

        return EpisodeRecord(
            task_id=self._state.task_id,
            total_steps=self._state.step_count,
            action_budget=self._episode.task_config.action_budget,
            verdicts=verdict_results,
            links=link_results,
            ads_metadata=ads_metadata,
            n_fraud_rings=len(self._episode.fraud_rings),
            ring_sizes=[len(r.member_ad_ids) for r in self._episode.fraud_rings],
        )

    # ------------------------------------------------------------------
    # Helpers
    # ------------------------------------------------------------------

    def _check_done(self) -> bool:
        if self._episode is None:
            return True
        all_reviewed = all(
            ad.ad_id in self._verdicts for ad in self._episode.ads
        )
        steps_exhausted = self._state.step_count >= self._episode.task_config.action_budget
        return all_reviewed or steps_exhausted

    def _check_link_correct(self, ad_id_1: str, ad_id_2: str) -> bool:
        """Check if two ads share a fraud ring."""
        for ring in self._episode.fraud_rings:
            if ad_id_1 in ring.member_ad_ids and ad_id_2 in ring.member_ad_ids:
                return True
        return False

    def _get_ad(self, ad_id: str) -> Optional[Ad]:
        if self._episode is None:
            return None
        for ad in self._episode.ads:
            if ad.ad_id == ad_id:
                return ad
        return None

    def _build_observation(

        self,

        reward: float,

        done: bool,

        feedback_override: str | None = None,

    ) -> AdReviewObservation:
        feedback = feedback_override or self._last_feedback

        if self._episode is None:
            return AdReviewObservation(
                done=done,
                reward=reward,
                queue_summary="No episode loaded.",
                current_ad_info="",
                investigation_findings="",
                verdict_history_summary="",
                feedback=feedback,
                available_ads=[],
                queue_status={},
                queue_may_grow=self._queue_may_grow,
            )

        config = self._episode.task_config
        pending = [a for a in self._episode.ads if a.ad_id not in self._verdicts]
        reviewed = [a for a in self._episode.ads if a.ad_id in self._verdicts]

        steps_remaining = max(0, config.action_budget - self._state.step_count)

        # Surface the action-budget-vs-pending-ads pressure as a prominent
        # feedback banner. We use TWO triggers because they catch
        # different failure modes:
        #
        #   * `steps_remaining <= 2 * n_pending` β€” fires when you have
        #     less than ~2 actions per pending ad left. Catches the
        #     "looped on ad_001 for 20 steps and now there isn't time
        #     to verdict everyone" case the 1.5B Investigator falls into.
        #     This trigger doesn't depend on investigation budget β€” even
        #     with full investigation budget, if step budget is tight,
        #     verdicts MUST start now.
        #
        #   * `remaining_investigation_budget <= n_pending` β€” original
        #     trigger. Catches the case where investigation actions
        #     have actually been spent on real pulls, not on rejected
        #     duplicates.
        
        if not done and pending and feedback_override is None:
            n_pending = len(pending)
            budget = self._state.remaining_budget
            steps_left = steps_remaining
            steps_pressure = steps_left <= 2 * n_pending
            budget_pressure = budget <= n_pending
            if steps_pressure or budget_pressure:
                # Pick the tighter wording.
                if steps_pressure:
                    pressure_line = (
                        f"BUDGET PRESSURE: only {steps_left} step(s) left in this "
                        f"episode for {n_pending} pending ad(s). Stop investigating "
                        f"and START VERDICTING β€” issue verdict actions "
                        f"(action_type='verdict', verdict in approve/reject/escalate) "
                        f"on the pending ads using whatever evidence you have. "
                        f"Unverdict-ed ads auto-approve at audit and tank the score."
                    )
                else:
                    pressure_line = (
                        f"BUDGET PRESSURE: only {budget} investigation(s) left for "
                        f"{n_pending} pending ad(s). Stop investigating and START "
                        f"VERDICTING β€” issue verdict actions on the pending ads using "
                        f"whatever evidence you have (approve, reject, or escalate). "
                        f"Unverdict-ed ads auto-approve at audit time and tank the score."
                    )
                feedback = (
                    f"{pressure_line}\n\n{feedback}" if feedback else pressure_line
                )
        queue_summary = (
            f"Task: {config.name} ({config.difficulty})\n"
            f"Total ads: {config.queue_size} | "
            f"Reviewed: {len(reviewed)} | "
            f"Pending: {len(pending)} | "
            f"Steps remaining: {steps_remaining}/{config.action_budget} | "
            f"Investigation budget: {self._state.remaining_budget} | "
            f"Step: {self._state.step_count}"
        )

        current_ad_info = ""
        if self._focused_ad_id and not done:
            ad = self._get_ad(self._focused_ad_id)
            if ad and ad.ad_id not in self._verdicts:
                signals = ", ".join(ad.initial_risk_signals) if ad.initial_risk_signals else "None"
                investigated = self._investigations.get(ad.ad_id, [])
                attempts = self._investigation_attempts.get(ad.ad_id, 0)

                _all_targets = [
                    "advertiser_history", "landing_page", "payment_method",
                    "targeting_overlap", "campaign_structure", "policy_classifier",
                ]
                exhausted = [t for t in _all_targets if t in investigated]
                fresh = [t for t in _all_targets if t not in investigated]
                exhausted_line = (
                    f"ALREADY-EXHAUSTED targets for {ad.ad_id} (do NOT repeat): "
                    f"{', '.join(exhausted) if exhausted else 'none yet'}"
                )
                fresh_line = (
                    f"FRESH targets for {ad.ad_id} (you may pick one of these or verdict): "
                    f"{', '.join(fresh) if fresh else 'none β€” verdict this ad now'}"
                )

                # Contextual metadata visible before investigation
                profile = self._episode.advertiser_profiles.get(ad.ad_id)
                meta_lines = []
                if profile:
                    meta_lines.append(f"Advertiser country: {profile.country}")
                    meta_lines.append(f"Account age: {profile.account_age_days} days")
                    if profile.account_age_days < 30:
                        meta_lines.append("Flag: New account (< 30 days)")
                context_meta = "\n".join(meta_lines)

                from ..data.meta_policy_taxonomy import lookup as _meta_lookup

                policy_entry = _meta_lookup(ad.category)
                meta_policy_line = (
                    f"Meta policy lens: {policy_entry.citation_id} β€” "
                    f"{policy_entry.section} > {policy_entry.subsection}"
                )

                # If the model has spammed this ad past N attempts,
                # surface that loud-and-clear at the top of the focus
                # block so the next observation pushes harder toward
                # verdict instead of silently re-anchoring.
                stuck_banner = ""
                if attempts >= 3:
                    stuck_banner = (
                        f"STUCK ON {ad.ad_id}: you have attempted {attempts} "
                        f"investigate actions on this ad. ISSUE A VERDICT NOW.\n"
                    )

                current_ad_info = (
                    f"{stuck_banner}"
                    f"=== Ad in Focus: {ad.ad_id} ===\n"
                    f"Category: {ad.category}\n"
                    f"{meta_policy_line}\n"
                    f"Ad copy: \"{ad.ad_copy}\"\n"
                    f"Targeting: {ad.targeting_summary}\n"
                    f"Initial risk signals: {signals}\n"
                    f"{context_meta}\n"
                    f"{exhausted_line}\n"
                    f"{fresh_line}"
                )

        investigation_findings = ""
        for ad_id, targets in self._investigations.items():
            for target in targets:
                if self._registry is not None:
                    finding = self._registry.lookup(ad_id, target)
                else:
                    finding = self._episode.investigation_data.get(ad_id, {}).get(target, "")
                if finding and not finding.startswith("No data") and not finding.startswith("Unknown"):
                    investigation_findings += f"\n[{ad_id} / {target}]\n{finding}\n"

        manual_verdicts = {
            ad_id: v for ad_id, v in self._verdicts.items()
            if not v.get("auto_approved")
        }
        if manual_verdicts:
            counts = {"approve": 0, "reject": 0, "escalate": 0}
            by_decision = {"approve": [], "reject": [], "escalate": []}
            for ad_id, v in manual_verdicts.items():
                counts[v["verdict"]] = counts.get(v["verdict"], 0) + 1
                by_decision[v["verdict"]].append(ad_id)
            summary_parts = [f"{c} {k}" for k, c in counts.items() if c > 0]
            verdict_lines = [
                f"Reviewed {len(manual_verdicts)} ad(s): {', '.join(summary_parts)}."
            ]
            for decision in ("reject", "approve", "escalate"):
                if by_decision[decision]:
                    verdict_lines.append(
                        f"  {decision}: {', '.join(by_decision[decision])}"
                    )
            verdict_history_summary = "\n".join(verdict_lines)
        else:
            verdict_history_summary = "No verdicts yet."

        available_ads = [a.ad_id for a in pending]

        queue_status = {
            "total_ads": config.queue_size,
            "reviewed": len(reviewed),
            "pending": len(pending),
            "investigation_budget": self._state.remaining_budget,
            "steps_remaining": steps_remaining,
            "step": self._state.step_count,
            "task_id": config.task_id,
        }

        evidence_ledger = self._build_evidence_ledger()
        queue_digest = self._build_queue_digest(pending)
        decided_ads = self._build_decided_ads()

        return AdReviewObservation(
            done=done,
            reward=reward,
            queue_summary=queue_summary,
            current_ad_info=current_ad_info,
            investigation_findings=investigation_findings.strip(),
            verdict_history_summary=verdict_history_summary,
            feedback=feedback,
            available_ads=available_ads,
            queue_status=queue_status,
            queue_may_grow=self._queue_may_grow,
            evidence_ledger=evidence_ledger,
            queue_digest=queue_digest,
            decided_ads=decided_ads,
        )

    # Curated columns surfaced in the no-investigation queue digest so
    # the Investigator has SOMETHING to triage on for every pending ad,
    # not just the focused one. Mix of:
    #
    #   * Discriminative (used for link_accounts decisions when shared
    #     across ads):  payment_type, registrar, domain
    #   * Decoy / non-discriminative (deliberately included so the
    #     policy must learn not to weight them):
    #     category, country, account_age_days
    #
    # Total of ~6 columns Γ— 12 ads β‰ˆ 720 chars worst case, well within
    # prompt budget. payment_id / advertiser_id / targeting_fingerprint
    # are NOT exposed here β€” those are the high-signal columns that
    # MUST require an explicit investigate to avoid trivialising the
    # task; they remain in the evidence_ledger only.
    _QUEUE_DIGEST_MAX_ADS = 12

    def _build_queue_digest(

        self, pending_ads: List[Ad]

    ) -> List[Dict[str, Any]]:
        if self._episode is None or not pending_ads:
            return []

        lp_map = getattr(self._episode, "landing_pages", {}) or {}
        profiles = getattr(self._episode, "advertiser_profiles", {}) or {}

        rows: List[Dict[str, Any]] = []
        for ad in pending_ads[: self._QUEUE_DIGEST_MAX_ADS]:
            row: Dict[str, Any] = {
                "ad_id": ad.ad_id,
                "category": ad.category,
            }
            profile = profiles.get(ad.ad_id)
            if profile is not None:
                row["country"] = profile.country
                row["account_age_days"] = profile.account_age_days
                row["payment_type"] = profile.payment_method_type
            lp = lp_map.get(ad.ad_id)
            if lp is not None:
                row["domain"] = getattr(lp, "domain", None)
                row["registrar"] = getattr(lp, "registrar", None)
            rows.append({k: v for k, v in row.items() if v is not None})
        return rows

    def _build_evidence_ledger(self) -> Dict[str, Dict[str, Any]]:
        """Assemble a per-ad structured evidence table for the Investigator.



        Delegates to :func:`build_evidence_ledger` so the Referee can reuse

        the exact same extraction logic (with a different ``ad_ids``

        selection) when building the Fraudster's ``my_proposal_signals``.

        """
        if self._episode is None:
            return {}
        candidate_ad_ids: set[str] = set(self._investigations.keys())
        if self._focused_ad_id:
            candidate_ad_ids.add(self._focused_ad_id)
        return build_evidence_ledger(
            episode=self._episode,
            registry=self._registry,
            ad_ids=candidate_ad_ids,
            investigations=self._investigations,
        )

    def _build_decided_ads(self) -> list[Dict[str, Any]]:
        """Build a per-decided-ad summary with verdict + key signals.



        Each entry carries the verdict, confidence, and a curated mix of

        discriminative + decoy parameters from the evidence ledger. This

        gives the Investigator structured memory of past decisions so it

        can detect cross-ad collisions for link_accounts.

        """
        if self._episode is None:
            return []

        decided_ad_ids = [
            ad_id for ad_id in self._verdicts
            if not self._verdicts[ad_id].get("auto_approved")
        ]
        if not decided_ad_ids:
            return []

        ledger = build_evidence_ledger(
            episode=self._episode,
            registry=self._registry,
            ad_ids=decided_ad_ids,
            investigations=self._investigations,
        )

        rows: list[Dict[str, Any]] = []
        for ad_id in decided_ad_ids:
            v = self._verdicts[ad_id]
            entry: Dict[str, Any] = {
                "ad_id": ad_id,
                "verdict": v.get("verdict", "?"),
                "confidence": v.get("confidence", 0.5),
            }
            signals = ledger.get(ad_id, {})
            entry.update(signals)
            rows.append(entry)
        return rows

    # ------------------------------------------------------------------
    # Referee integration hooks
    # ------------------------------------------------------------------

    def notify_queue_grew(self, new_ad_id: str) -> None:
        """

        Called by the Referee after `extend_episode_with_proposal` adds a

        new ad to the shared episode + registry.  Updates the Investigator's

        view of queue size and refocuses on the new ad if the Investigator

        is idle.

        """
        if self._episode is None:
            return
        self._state.total_ads = len(self._episode.ads)
        if self._focused_ad_id is None or self._focused_ad_id in self._verdicts:
            self._focused_ad_id = new_ad_id

    @property
    def episode(self) -> Optional[GeneratedEpisode]:
        """Read-only access to the loaded episode (used by the Referee)."""
        return self._episode

    @property
    def registry(self) -> Optional[InvestigationToolRegistry]:
        """Read-only access to the shared tool registry."""
        return self._registry

    @property
    def verdicts(self) -> Dict[str, Dict[str, Any]]:
        """Read-only snapshot of verdicts recorded so far (Referee/auditor)."""
        return dict(self._verdicts)

    @property
    def investigations(self) -> Dict[str, List[str]]:
        """Read-only snapshot of investigation targets pulled per ad."""
        return {k: list(v) for k, v in self._investigations.items()}

    @property
    def links(self) -> List[Dict[str, Any]]:
        """Read-only snapshot of recorded network links."""
        return list(self._links)


# Backwards-compatible alias.  Round 1 code, tests, clients, and external
# integrations import `AdFraudEnvironment` directly; keeping the symbol
# means the rename is zero-breakage.
AdFraudEnvironment = InvestigatorEnvironment