datacentric-env / server /anti_exploit.py
Aswini-Kumar's picture
Upload server/anti_exploit.py with huggingface_hub
cabb44d verified
"""
server/anti_exploit.py — Hard rule enforcement against reward hacking.
Checks run BEFORE any action executes. If any check fails, the step
returns an error response with a minimum reward penalty — the model
learns that exploit attempts are costly.
Rules enforced:
1. No same action 3+ times in a row (spam detection)
2. Re-applying an already-applied rec_id is blocked
3. query_validator / query_analyst blocked when budget < 3
4. apply blocked if rec_id not in pending_recs for this session
5. apply blocked if rec_id was generated in a different session
6. Steps beyond budget cap return done=True immediately
"""
from __future__ import annotations
from server.config import cfg
class ExploitDetected(Exception):
"""Raised when an anti-exploit rule is triggered."""
def __init__(self, rule: str, detail: str):
self.rule = rule
self.detail = detail
super().__init__(f"[{rule}] {detail}")
class AntiExploit:
"""Stateful exploit checker. One instance per session."""
def __init__(self):
self._action_history: list[str] = []
def check(
self,
action: dict,
budget_remaining: int,
pending_recs: dict,
applied_rec_ids: set,
) -> None:
"""
Run all anti-exploit checks. Raises ExploitDetected on violation.
Call this before executing any action in environment.step().
"""
action_type = action.get("action", "")
# Rule 1: budget exhausted
if budget_remaining <= 0:
raise ExploitDetected(
"budget_exhausted",
"No budget remaining. Episode should be done."
)
# Rule 2: same action streak
self._action_history.append(action_type)
if len(self._action_history) >= cfg.MAX_SAME_ACTION_STREAK:
recent = self._action_history[-cfg.MAX_SAME_ACTION_STREAK:]
if len(set(recent)) == 1 and recent[0] not in ("apply",):
raise ExploitDetected(
"action_spam",
f"Action '{action_type}' repeated {cfg.MAX_SAME_ACTION_STREAK}+ times in a row. "
f"This is reward hacking — vary your strategy."
)
# Rule 3: expensive queries when budget is low
expensive = {"query_validator", "query_analyst"}
if action_type in expensive and budget_remaining < 3:
raise ExploitDetected(
"low_budget_expensive_query",
f"'{action_type}' costs 2 budget but only {budget_remaining} remain. "
f"Query cheaper agents first."
)
# Rule 4+5: apply checks
if action_type == "apply":
rec_id = action.get("rec_id", "")
if not rec_id:
raise ExploitDetected(
"missing_rec_id",
"apply action requires a 'rec_id'. Query an agent first."
)
if rec_id in applied_rec_ids:
raise ExploitDetected(
"duplicate_apply",
f"rec_id '{rec_id}' has already been applied in this episode."
)
if rec_id not in pending_recs:
raise ExploitDetected(
"invalid_rec_id",
f"rec_id '{rec_id}' not found. Available: {list(pending_recs.keys())}. "
f"Query an agent first."
)
def reset(self):
self._action_history = []