Spaces:
Sleeping
Sleeping
| """ | |
| server/anti_exploit.py — Hard rule enforcement against reward hacking. | |
| Checks run BEFORE any action executes. If any check fails, the step | |
| returns an error response with a minimum reward penalty — the model | |
| learns that exploit attempts are costly. | |
| Rules enforced: | |
| 1. No same action 3+ times in a row (spam detection) | |
| 2. Re-applying an already-applied rec_id is blocked | |
| 3. query_validator / query_analyst blocked when budget < 3 | |
| 4. apply blocked if rec_id not in pending_recs for this session | |
| 5. apply blocked if rec_id was generated in a different session | |
| 6. Steps beyond budget cap return done=True immediately | |
| """ | |
| from __future__ import annotations | |
| from server.config import cfg | |
| class ExploitDetected(Exception): | |
| """Raised when an anti-exploit rule is triggered.""" | |
| def __init__(self, rule: str, detail: str): | |
| self.rule = rule | |
| self.detail = detail | |
| super().__init__(f"[{rule}] {detail}") | |
| class AntiExploit: | |
| """Stateful exploit checker. One instance per session.""" | |
| def __init__(self): | |
| self._action_history: list[str] = [] | |
| def check( | |
| self, | |
| action: dict, | |
| budget_remaining: int, | |
| pending_recs: dict, | |
| applied_rec_ids: set, | |
| ) -> None: | |
| """ | |
| Run all anti-exploit checks. Raises ExploitDetected on violation. | |
| Call this before executing any action in environment.step(). | |
| """ | |
| action_type = action.get("action", "") | |
| # Rule 1: budget exhausted | |
| if budget_remaining <= 0: | |
| raise ExploitDetected( | |
| "budget_exhausted", | |
| "No budget remaining. Episode should be done." | |
| ) | |
| # Rule 2: same action streak | |
| self._action_history.append(action_type) | |
| if len(self._action_history) >= cfg.MAX_SAME_ACTION_STREAK: | |
| recent = self._action_history[-cfg.MAX_SAME_ACTION_STREAK:] | |
| if len(set(recent)) == 1 and recent[0] not in ("apply",): | |
| raise ExploitDetected( | |
| "action_spam", | |
| f"Action '{action_type}' repeated {cfg.MAX_SAME_ACTION_STREAK}+ times in a row. " | |
| f"This is reward hacking — vary your strategy." | |
| ) | |
| # Rule 3: expensive queries when budget is low | |
| expensive = {"query_validator", "query_analyst"} | |
| if action_type in expensive and budget_remaining < 3: | |
| raise ExploitDetected( | |
| "low_budget_expensive_query", | |
| f"'{action_type}' costs 2 budget but only {budget_remaining} remain. " | |
| f"Query cheaper agents first." | |
| ) | |
| # Rule 4+5: apply checks | |
| if action_type == "apply": | |
| rec_id = action.get("rec_id", "") | |
| if not rec_id: | |
| raise ExploitDetected( | |
| "missing_rec_id", | |
| "apply action requires a 'rec_id'. Query an agent first." | |
| ) | |
| if rec_id in applied_rec_ids: | |
| raise ExploitDetected( | |
| "duplicate_apply", | |
| f"rec_id '{rec_id}' has already been applied in this episode." | |
| ) | |
| if rec_id not in pending_recs: | |
| raise ExploitDetected( | |
| "invalid_rec_id", | |
| f"rec_id '{rec_id}' not found. Available: {list(pending_recs.keys())}. " | |
| f"Query an agent first." | |
| ) | |
| def reset(self): | |
| self._action_history = [] | |