Spaces:
Running
Running
| from __future__ import annotations | |
| import statistics | |
| from dataclasses import dataclass | |
| from typing import Any | |
| class RewardClaim: | |
| agent_id: str | |
| reward: float | |
| justification: str | |
| timestamp: int | |
| approved: bool | None = None | |
| class ActionRecord: | |
| agent_id: str | |
| action: dict[str, Any] | |
| timestamp: int | |
| class AuditLedger: | |
| """ | |
| Records reward claims and actions so an Auditor policy can detect hacking. | |
| """ | |
| def __init__(self) -> None: | |
| self._claims: list[RewardClaim] = [] | |
| self._actions: list[ActionRecord] = [] | |
| self._flags: dict[str, list[str]] = {} | |
| def record_claim( | |
| self, | |
| agent_id: str, | |
| reward: float, | |
| justification: str, | |
| timestamp: int, | |
| approved: bool | None = None, | |
| ) -> None: | |
| self._claims.append( | |
| RewardClaim( | |
| agent_id=agent_id, | |
| reward=max(0.0, min(1.0, reward)), | |
| justification=justification, | |
| timestamp=timestamp, | |
| approved=approved, | |
| ) | |
| ) | |
| def record_action(self, agent_id: str, action: dict[str, Any], timestamp: int) -> None: | |
| self._actions.append(ActionRecord(agent_id=agent_id, action=dict(action), timestamp=timestamp)) | |
| def flag(self, agent_id: str, reason: str) -> None: | |
| self._flags.setdefault(agent_id, []).append(reason) | |
| def clear(self, agent_id: str) -> None: | |
| self._flags.pop(agent_id, None) | |
| def anomaly_scores(self) -> dict[str, float]: | |
| """ | |
| Simple deterministic anomaly score in [0, 1]. | |
| High scores mean the agent's reward claims are unusually high, volatile, | |
| denied by the auditor, or already flagged. | |
| """ | |
| scores: dict[str, float] = {} | |
| for agent_id in self._agent_ids(): | |
| claims = [claim for claim in self._claims if claim.agent_id == agent_id] | |
| if not claims: | |
| scores[agent_id] = 0.0 | |
| continue | |
| rewards = [claim.reward for claim in claims] | |
| avg_reward = statistics.fmean(rewards) | |
| volatility = statistics.pstdev(rewards) if len(rewards) > 1 else 0.0 | |
| denied_rate = sum(1 for claim in claims if claim.approved is False) / len(claims) | |
| high_claim_rate = sum(1 for reward in rewards if reward >= 0.90) / len(rewards) | |
| flag_bonus = min(0.25, 0.08 * len(self._flags.get(agent_id, []))) | |
| score = ( | |
| 0.35 * high_claim_rate | |
| + 0.25 * max(0.0, avg_reward - 0.70) / 0.30 | |
| + 0.20 * min(1.0, volatility / 0.40) | |
| + 0.20 * denied_rate | |
| + flag_bonus | |
| ) | |
| scores[agent_id] = round(max(0.0, min(1.0, score)), 4) | |
| return scores | |
| def investigate(self, agent_id: str, window: int = 10) -> dict[str, Any]: | |
| latest_time = self._latest_timestamp() | |
| since = max(0, latest_time - window) | |
| claims = [ | |
| claim for claim in self._claims | |
| if claim.agent_id == agent_id and claim.timestamp >= since | |
| ] | |
| actions = [ | |
| record for record in self._actions | |
| if record.agent_id == agent_id and record.timestamp >= since | |
| ] | |
| rewards = [claim.reward for claim in claims] | |
| return { | |
| "agent_id": agent_id, | |
| "window": window, | |
| "claims": [claim.__dict__ for claim in claims], | |
| "actions": [record.__dict__ for record in actions], | |
| "avg_claimed_reward": round(statistics.fmean(rewards), 4) if rewards else 0.0, | |
| "denied_claims": sum(1 for claim in claims if claim.approved is False), | |
| "flags": list(self._flags.get(agent_id, [])), | |
| "anomaly_score": self.anomaly_scores().get(agent_id, 0.0), | |
| } | |
| def snapshot(self) -> dict[str, Any]: | |
| return { | |
| "claims": [claim.__dict__ for claim in self._claims], | |
| "actions": [record.__dict__ for record in self._actions], | |
| "anomaly_scores": self.anomaly_scores(), | |
| "flags": {agent: list(reasons) for agent, reasons in self._flags.items()}, | |
| } | |
| def _agent_ids(self) -> set[str]: | |
| return ( | |
| {claim.agent_id for claim in self._claims} | |
| | {record.agent_id for record in self._actions} | |
| | set(self._flags) | |
| ) | |
| def _latest_timestamp(self) -> int: | |
| timestamps = [claim.timestamp for claim in self._claims] + [ | |
| record.timestamp for record in self._actions | |
| ] | |
| return max(timestamps) if timestamps else 0 | |