""" server/anti_exploit.py — Hard rule enforcement against reward hacking. Checks run BEFORE any action executes. If any check fails, the step returns an error response with a minimum reward penalty — the model learns that exploit attempts are costly. Rules enforced: 1. No same action 3+ times in a row (spam detection) 2. Re-applying an already-applied rec_id is blocked 3. query_validator / query_analyst blocked when budget < 3 4. apply blocked if rec_id not in pending_recs for this session 5. apply blocked if rec_id was generated in a different session 6. Steps beyond budget cap return done=True immediately """ from __future__ import annotations from server.config import cfg class ExploitDetected(Exception): """Raised when an anti-exploit rule is triggered.""" def __init__(self, rule: str, detail: str): self.rule = rule self.detail = detail super().__init__(f"[{rule}] {detail}") class AntiExploit: """Stateful exploit checker. One instance per session.""" def __init__(self): self._action_history: list[str] = [] def check( self, action: dict, budget_remaining: int, pending_recs: dict, applied_rec_ids: set, ) -> None: """ Run all anti-exploit checks. Raises ExploitDetected on violation. Call this before executing any action in environment.step(). """ action_type = action.get("action", "") # Rule 1: budget exhausted if budget_remaining <= 0: raise ExploitDetected( "budget_exhausted", "No budget remaining. Episode should be done." ) # Rule 2: same action streak self._action_history.append(action_type) if len(self._action_history) >= cfg.MAX_SAME_ACTION_STREAK: recent = self._action_history[-cfg.MAX_SAME_ACTION_STREAK:] if len(set(recent)) == 1 and recent[0] not in ("apply",): raise ExploitDetected( "action_spam", f"Action '{action_type}' repeated {cfg.MAX_SAME_ACTION_STREAK}+ times in a row. " f"This is reward hacking — vary your strategy." ) # Rule 3: expensive queries when budget is low expensive = {"query_validator", "query_analyst"} if action_type in expensive and budget_remaining < 3: raise ExploitDetected( "low_budget_expensive_query", f"'{action_type}' costs 2 budget but only {budget_remaining} remain. " f"Query cheaper agents first." ) # Rule 4+5: apply checks if action_type == "apply": rec_id = action.get("rec_id", "") if not rec_id: raise ExploitDetected( "missing_rec_id", "apply action requires a 'rec_id'. Query an agent first." ) if rec_id in applied_rec_ids: raise ExploitDetected( "duplicate_apply", f"rec_id '{rec_id}' has already been applied in this episode." ) if rec_id not in pending_recs: raise ExploitDetected( "invalid_rec_id", f"rec_id '{rec_id}' not found. Available: {list(pending_recs.keys())}. " f"Query an agent first." ) def reset(self): self._action_history = []