File size: 14,387 Bytes
26bf1c9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
"""

Scripted Fraudster policies (baselines + adversarial opponents).



All three policies consume a `FraudsterObservation` dict (as delivered over the

WebSocket) and return a `FraudsterAction` Pydantic instance.



    ScriptedFraudster    deterministic: propose 2 ads round 1, 1 ad thereafter,

                         commit_final once all proposals are used.

    ReactiveFraudster    adapts category + content based on which of its

                         prior proposals were rejected and which investigation

                         targets the Investigator kept pulling.

    GibberishFraudster   low-plausibility negative control: proposes random

                         gibberish ad copy (for sanity-checking plausibility

                         grader).

"""

from __future__ import annotations

import random
import string
from typing import Any, Dict, List, Optional

from ..models import FraudsterAction
from ._base import PolicyBase


# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------


_LEGIT_CAMOUFLAGE_CATEGORIES = (
    "ecommerce",
    "saas",
    "local_service",
    "education",
    "fitness",
)
_FRAUD_CATEGORIES = (
    "fake_giveaway",
    "fake_crypto",
    "fake_weight_loss",
    "gray_area_supplements",
    "fake_emergency",
)

_CAMOUFLAGE_ADS = [
    (
        "Boost your small business inventory tracking with CloudLedger — 14-day free trial.",
        "CloudLedger helps shops manage SKUs, stock levels, and vendor orders.",
        "saas",
        "SMB owners, US, interests: accounting / inventory",
    ),
    (
        "Upgrade your home gym with adjustable dumbbells, now $129 with free shipping.",
        "Space-saving dumbbells that replace a full rack, ships in 48h.",
        "fitness",
        "Adults 25-45, US, home-gym interest",
    ),
    (
        "Online Python fundamentals course with projects and a live mentor — enroll today.",
        "8-week beginner Python bootcamp, weekly office hours, job-ready portfolio.",
        "education",
        "Career-switchers 22-35",
    ),
    (
        "Downtown dog-walking service — $18 per walk, insured handlers, book online.",
        "Reliable neighborhood dog walking with GPS tracking and photo updates.",
        "local_service",
        "Urban dog owners, city centers",
    ),
    (
        "Refurbished iPhone 13 256GB — warranty included, $399 this week only.",
        "Certified refurbished phones with 12-month warranty and 30-day returns.",
        "ecommerce",
        "Adults 18-55, price-sensitive shoppers",
    ),
]

_FRAUD_ADS = [
    (
        "Free iPhone 15 — first 500 customers only, tap now!",
        "Giveaway ends at midnight, quick verify your shipping address.",
        "fake_giveaway",
        "Anyone, low-income targeting",
    ),
    (
        "Invest in MoonCoin — guaranteed 50x returns in 30 days, risk-free!",
        "Join our MoonCoin pre-sale, 10% bonus for first buyers.",
        "fake_crypto",
        "Men 18-35, crypto interest",
    ),
    (
        "Lose 20 pounds in 7 days with this one weird trick doctors hate!",
        "Revolutionary weight-loss pill that dissolves fat overnight.",
        "fake_weight_loss",
        "Women 25-55, weight-loss interest",
    ),
]


def _truncate(text: str, limit: int) -> str:
    return text if len(text) <= limit else text[: limit - 3] + "..."


def _choose_allowed(

    preferred: str, observation: Dict[str, Any], *, fallback: str = "ecommerce"

) -> str:
    """Pick `preferred` if it's in the observation's allowed_categories, else fall back."""
    allowed: List[str] = observation.get("allowed_categories") or []
    if not allowed:
        return preferred
    if preferred in allowed:
        return preferred
    for cat in _LEGIT_CAMOUFLAGE_CATEGORIES:
        if cat in allowed:
            return cat
    if fallback in allowed:
        return fallback
    return allowed[0]


# ---------------------------------------------------------------------------
# ScriptedFraudster
# ---------------------------------------------------------------------------


class ScriptedFraudster(PolicyBase):
    """

    Deterministic fraudster. Round 1 proposes two ads (one camouflage, one

    overt fraud) and ends turn; subsequent rounds propose one more if budget

    allows; once proposals are exhausted (or on round N) it commits_final.



    Good for regression tests and as a baseline opponent.

    """

    def __init__(self, *, seed: int = 0) -> None:
        self._rng = random.Random(seed)
        self._seed = seed
        self._turn_actions_taken = 0

    def reset(self) -> None:
        self._rng = random.Random(self._seed)
        self._turn_actions_taken = 0

    def act(self, observation: Dict[str, Any]) -> FraudsterAction:
        remaining = int(observation.get("proposals_remaining", 0))
        actions_left = int(observation.get("actions_left_this_turn", 1))
        round_number = int(observation.get("round_number", 1))

        if remaining <= 0:
            return FraudsterAction(
                action_type="commit_final",
                rationale="No proposals left; locking in.",
            )

        if actions_left <= 1:
            ad_copy, blurb, cat, targeting = self._pick_ad(round_number)
            cat = _choose_allowed(cat, observation)
            return FraudsterAction(
                action_type="propose_ad",
                ad_copy=_truncate(ad_copy, 2000),
                landing_page_blurb=_truncate(blurb, 2000),
                category=cat,
                targeting_summary=targeting,
                rationale=(
                    "ScriptedFraudster: final slot of the turn, proposing once "
                    "before yielding to Investigator."
                ),
            )

        ad_copy, blurb, cat, targeting = self._pick_ad(round_number)
        cat = _choose_allowed(cat, observation)
        return FraudsterAction(
            action_type="propose_ad",
            ad_copy=_truncate(ad_copy, 2000),
            landing_page_blurb=_truncate(blurb, 2000),
            category=cat,
            targeting_summary=targeting,
            rationale="ScriptedFraudster: executing canonical round plan.",
        )

    def _pick_ad(self, round_number: int):
        if round_number == 1:
            return self._rng.choice(_FRAUD_ADS)
        return self._rng.choice(_CAMOUFLAGE_ADS)


# ---------------------------------------------------------------------------
# ReactiveFraudster
# ---------------------------------------------------------------------------


class ReactiveFraudster(PolicyBase):
    """

    Reactive fraudster. Observes the Investigator's prior verdicts and

    investigation targets, then adapts:



      * If any of its own proposals were rejected, it switches to camouflage

        categories and invests in a stronger landing-page blurb.

      * If the Investigator keeps pulling `landing_page`, the Fraudster

        beefs up that field; if `advertiser_history`, it adds benign

        targeting context.

      * Commits_final early if the Investigator has been soft (high approval

        rate on prior proposals → try to lock them in before audit).

    """

    def __init__(self, *, seed: int = 0) -> None:
        self._rng = random.Random(seed)
        self._seed = seed

    def reset(self) -> None:
        self._rng = random.Random(self._seed)

    def act(self, observation: Dict[str, Any]) -> FraudsterAction:
        remaining = int(observation.get("proposals_remaining", 0))
        actions_left = int(observation.get("actions_left_this_turn", 1))
        my_rejected = self._my_rejected_count(observation)
        my_approved = self._my_approved_count(observation)
        targets_emphasized = self._dominant_investigation_target(observation)

        if remaining <= 0:
            return FraudsterAction(
                action_type="commit_final",
                rationale=(
                    f"ReactiveFraudster: budget exhausted; {my_approved} of my "
                    f"proposals approved so far, locking in."
                ),
            )

        if my_approved >= 2 and self._rng.random() < 0.35:
            return FraudsterAction(
                action_type="commit_final",
                rationale=(
                    "ReactiveFraudster: Investigator is lenient; lock in current "
                    "slate before they harden."
                ),
            )

        mod = self._maybe_modify_pending(observation, targets_emphasized)
        if mod is not None:
            return mod

        if my_rejected >= 1:
            ad_copy, blurb, cat, targeting = self._rng.choice(_CAMOUFLAGE_ADS)
            rationale = (
                f"ReactiveFraudster: pivoting to camouflage ({cat}) after "
                f"{my_rejected} rejections; Investigator pulled "
                f"{targets_emphasized or 'no signal'}."
            )
        else:
            ad_copy, blurb, cat, targeting = self._rng.choice(_FRAUD_ADS)
            rationale = (
                "ReactiveFraudster: Investigator hasn't rejected me yet; "
                "testing a borderline/fraud template."
            )

        if targets_emphasized == "landing_page":
            blurb = (
                "Trusted brand with 12,000+ reviews — verified customer "
                "testimonials, PCI-DSS compliant checkout, and a 30-day "
                "money-back guarantee. " + blurb
            )
        elif targets_emphasized == "advertiser_history":
            targeting = (
                targeting + "; advertiser active since 2018 with >3y domain age"
            )

        cat = _choose_allowed(cat, observation)
        if actions_left <= 1:
            rationale += " (final slot of this turn)"

        return FraudsterAction(
            action_type="propose_ad",
            ad_copy=_truncate(ad_copy, 2000),
            landing_page_blurb=_truncate(blurb, 2000),
            category=cat,
            targeting_summary=_truncate(targeting, 512),
            rationale=_truncate(rationale, 2000),
        )

    def _my_rejected_count(self, observation: Dict[str, Any]) -> int:
        return sum(
            1
            for v in observation.get("prior_verdicts", [])
            if v.get("was_my_proposal") and v.get("verdict") == "reject"
        )

    def _my_approved_count(self, observation: Dict[str, Any]) -> int:
        return sum(
            1
            for v in observation.get("prior_verdicts", [])
            if v.get("was_my_proposal") and v.get("verdict") == "approve"
        )

    def _dominant_investigation_target(

        self, observation: Dict[str, Any]

    ) -> Optional[str]:
        targets_used: Dict[str, List[str]] = observation.get(
            "investigation_targets_used", {}
        )
        counter: Dict[str, int] = {}
        for targets in targets_used.values():
            for t in targets:
                counter[t] = counter.get(t, 0) + 1
        if not counter:
            return None
        return max(counter, key=lambda k: counter[k])

    def _maybe_modify_pending(

        self, observation: Dict[str, Any], targets_emphasized: Optional[str]

    ) -> Optional[FraudsterAction]:
        if targets_emphasized is None:
            return None

        current_queue = observation.get("current_queue", [])
        for item in current_queue:
            if not item.get("is_my_proposal"):
                continue
            if item.get("status") not in ("pending", "investigating"):
                continue
            slot_index = item.get("slot_index")
            if slot_index is None:
                continue
            if self._rng.random() < 0.25:
                return FraudsterAction(
                    action_type="modify_pending_ad",
                    slot_index=int(slot_index),
                    new_landing_page_blurb=(
                        "Verified business with transparent pricing, real-time "
                        "order tracking, and 30-day returns. "
                    ),
                    rationale=(
                        "ReactiveFraudster: Investigator emphasizing "
                        f"{targets_emphasized}; strengthening pending slot "
                        f"{slot_index}."
                    ),
                )
        return None


# ---------------------------------------------------------------------------
# GibberishFraudster
# ---------------------------------------------------------------------------


class GibberishFraudster(PolicyBase):
    """

    Negative control: proposes nonsense ad copy to exercise the plausibility

    grader. The Auditor's Track-B score should collapse on this policy.

    """

    def __init__(self, *, seed: int = 0) -> None:
        self._rng = random.Random(seed)
        self._seed = seed

    def reset(self) -> None:
        self._rng = random.Random(self._seed)

    def act(self, observation: Dict[str, Any]) -> FraudsterAction:
        remaining = int(observation.get("proposals_remaining", 0))
        if remaining <= 0:
            return FraudsterAction(
                action_type="commit_final",
                rationale="GibberishFraudster: budget gone.",
            )
        cat = _choose_allowed(
            self._rng.choice(_FRAUD_CATEGORIES + _LEGIT_CAMOUFLAGE_CATEGORIES),
            observation,
        )
        return FraudsterAction(
            action_type="propose_ad",
            ad_copy=self._random_gibberish(self._rng.randint(40, 120)),
            landing_page_blurb=self._random_gibberish(self._rng.randint(20, 80)),
            category=cat,
            targeting_summary="adults",
            rationale="GibberishFraudster: random bytes.",
        )

    def _random_gibberish(self, length: int) -> str:
        alphabet = string.ascii_lowercase + "     "  # include whitespace
        return "".join(self._rng.choice(alphabet) for _ in range(length))