File size: 3,522 Bytes
cabb44d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
"""
server/anti_exploit.py — Hard rule enforcement against reward hacking.

Checks run BEFORE any action executes. If any check fails, the step
returns an error response with a minimum reward penalty — the model
learns that exploit attempts are costly.

Rules enforced:
  1. No same action 3+ times in a row (spam detection)
  2. Re-applying an already-applied rec_id is blocked
  3. query_validator / query_analyst blocked when budget < 3
  4. apply blocked if rec_id not in pending_recs for this session
  5. apply blocked if rec_id was generated in a different session
  6. Steps beyond budget cap return done=True immediately
"""
from __future__ import annotations
from server.config import cfg


class ExploitDetected(Exception):
    """Raised when an anti-exploit rule is triggered."""
    def __init__(self, rule: str, detail: str):
        self.rule = rule
        self.detail = detail
        super().__init__(f"[{rule}] {detail}")


class AntiExploit:
    """Stateful exploit checker. One instance per session."""

    def __init__(self):
        self._action_history: list[str] = []

    def check(
        self,
        action: dict,
        budget_remaining: int,
        pending_recs: dict,
        applied_rec_ids: set,
    ) -> None:
        """
        Run all anti-exploit checks. Raises ExploitDetected on violation.
        Call this before executing any action in environment.step().
        """
        action_type = action.get("action", "")

        # Rule 1: budget exhausted
        if budget_remaining <= 0:
            raise ExploitDetected(
                "budget_exhausted",
                "No budget remaining. Episode should be done."
            )

        # Rule 2: same action streak
        self._action_history.append(action_type)
        if len(self._action_history) >= cfg.MAX_SAME_ACTION_STREAK:
            recent = self._action_history[-cfg.MAX_SAME_ACTION_STREAK:]
            if len(set(recent)) == 1 and recent[0] not in ("apply",):
                raise ExploitDetected(
                    "action_spam",
                    f"Action '{action_type}' repeated {cfg.MAX_SAME_ACTION_STREAK}+ times in a row. "
                    f"This is reward hacking — vary your strategy."
                )

        # Rule 3: expensive queries when budget is low
        expensive = {"query_validator", "query_analyst"}
        if action_type in expensive and budget_remaining < 3:
            raise ExploitDetected(
                "low_budget_expensive_query",
                f"'{action_type}' costs 2 budget but only {budget_remaining} remain. "
                f"Query cheaper agents first."
            )

        # Rule 4+5: apply checks
        if action_type == "apply":
            rec_id = action.get("rec_id", "")

            if not rec_id:
                raise ExploitDetected(
                    "missing_rec_id",
                    "apply action requires a 'rec_id'. Query an agent first."
                )

            if rec_id in applied_rec_ids:
                raise ExploitDetected(
                    "duplicate_apply",
                    f"rec_id '{rec_id}' has already been applied in this episode."
                )

            if rec_id not in pending_recs:
                raise ExploitDetected(
                    "invalid_rec_id",
                    f"rec_id '{rec_id}' not found. Available: {list(pending_recs.keys())}. "
                    f"Query an agent first."
                )

    def reset(self):
        self._action_history = []